You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:45 UTC

[21/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/MapElements.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/MapElements.java
deleted file mode 100644
index 8997050..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/MapElements.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-/**
- * {@code PTransform}s for mapping a simple function over the elements of a {@link PCollection}.
- */
-public class MapElements<InputT, OutputT>
-extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
-
-  /**
-   * For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output type descriptor,
-   * returns a {@code PTransform} that takes an input {@code PCollection<InputT>} and returns
-   * a {@code PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in
-   * the input.
-   *
-   * <p>Example of use in Java 8:
-   * <pre>{@code
-   * PCollection<Integer> wordLengths = words.apply(
-   *     MapElements.via((String word) -> word.length())
-   *         .withOutputType(new TypeDescriptor<Integer>() {});
-   * }</pre>
-   *
-   * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
-   * descriptor need not be provided.
-   */
-  public static <InputT, OutputT> MissingOutputTypeDescriptor<InputT, OutputT>
-  via(SerializableFunction<InputT, OutputT> fn) {
-    return new MissingOutputTypeDescriptor<>(fn);
-  }
-
-  /**
-   * For a {@code SimpleFunction<InputT, OutputT>} {@code fn}, returns a {@code PTransform} that
-   * takes an input {@code PCollection<InputT>} and returns a {@code PCollection<OutputT>}
-   * containing {@code fn.apply(v)} for every element {@code v} in the input.
-   *
-   * <p>This overload is intended primarily for use in Java 7. In Java 8, the overload
-   * {@link #via(SerializableFunction)} supports use of lambda for greater concision.
-   *
-   * <p>Example of use in Java 7:
-   * <pre>{@code
-   * PCollection<String> words = ...;
-   * PCollection<Integer> wordsPerLine = words.apply(MapElements.via(
-   *     new SimpleFunction<String, Integer>() {
-   *       public Integer apply(String word) {
-   *         return word.length();
-   *       }
-   *     }));
-   * }</pre>
-   */
-  public static <InputT, OutputT> MapElements<InputT, OutputT>
-  via(final SimpleFunction<InputT, OutputT> fn) {
-    return new MapElements<>(fn, fn.getOutputTypeDescriptor());
-  }
-
-  /**
-   * An intermediate builder for a {@link MapElements} transform. To complete the transform, provide
-   * an output type descriptor to {@link MissingOutputTypeDescriptor#withOutputType}. See
-   * {@link #via(SerializableFunction)} for a full example of use.
-   */
-  public static final class MissingOutputTypeDescriptor<InputT, OutputT> {
-
-    private final SerializableFunction<InputT, OutputT> fn;
-
-    private MissingOutputTypeDescriptor(SerializableFunction<InputT, OutputT> fn) {
-      this.fn = fn;
-    }
-
-    public MapElements<InputT, OutputT> withOutputType(TypeDescriptor<OutputT> outputType) {
-      return new MapElements<>(fn, outputType);
-    }
-  }
-
-  ///////////////////////////////////////////////////////////////////
-
-  private final SerializableFunction<InputT, OutputT> fn;
-  private final transient TypeDescriptor<OutputT> outputType;
-
-  private MapElements(
-      SerializableFunction<InputT, OutputT> fn,
-      TypeDescriptor<OutputT> outputType) {
-    this.fn = fn;
-    this.outputType = outputType;
-  }
-
-  @Override
-  public PCollection<OutputT> apply(PCollection<InputT> input) {
-    return input.apply(ParDo.named("Map").of(new DoFn<InputT, OutputT>() {
-      @Override
-      public void processElement(ProcessContext c) {
-        c.output(fn.apply(c.element()));
-      }
-    })).setTypeDescriptorInternal(outputType);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Max.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Max.java
deleted file mode 100644
index 8678e4f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Max.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine.BinaryCombineFn;
-import com.google.cloud.dataflow.sdk.util.common.Counter;
-import com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind;
-import com.google.cloud.dataflow.sdk.util.common.CounterProvider;
-
-import java.io.Serializable;
-import java.util.Comparator;
-
-/**
- * {@code PTransform}s for computing the maximum of the elements in a {@code PCollection}, or the
- * maximum of the values associated with each key in a {@code PCollection} of {@code KV}s.
- *
- * <p>Example 1: get the maximum of a {@code PCollection} of {@code Double}s.
- * <pre> {@code
- * PCollection<Double> input = ...;
- * PCollection<Double> max = input.apply(Max.doublesGlobally());
- * } </pre>
- *
- * <p>Example 2: calculate the maximum of the {@code Integer}s
- * associated with each unique key (which is of type {@code String}).
- * <pre> {@code
- * PCollection<KV<String, Integer>> input = ...;
- * PCollection<KV<String, Integer>> maxPerKey = input
- *     .apply(Max.<String>integersPerKey());
- * } </pre>
- */
-public class Max {
-
-  private Max() {
-    // do not instantiate
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<Integer>} and returns a
-   * {@code PCollection<Integer>} whose contents is the maximum of the input {@code PCollection}'s
-   * elements, or {@code Integer.MIN_VALUE} if there are no elements.
-   */
-  public static Combine.Globally<Integer, Integer> integersGlobally() {
-    return Combine.globally(new MaxIntegerFn()).named("Max.Globally");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, Integer>>} and
-   * returns a {@code PCollection<KV<K, Integer>>} that contains an output element mapping each
-   * distinct key in the input {@code PCollection} to the maximum of the values associated with that
-   * key in the input {@code PCollection}.
-   *
-   * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
-   */
-  public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
-    return Combine.<K, Integer, Integer>perKey(new MaxIntegerFn()).named("Max.PerKey");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<Long>} and returns a {@code
-   * PCollection<Long>} whose contents is the maximum of the input {@code PCollection}'s elements,
-   * or {@code Long.MIN_VALUE} if there are no elements.
-   */
-  public static Combine.Globally<Long, Long> longsGlobally() {
-    return Combine.globally(new MaxLongFn()).named("Max.Globally");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, Long>>} and returns a
-   * {@code PCollection<KV<K, Long>>} that contains an output element mapping each distinct key in
-   * the input {@code PCollection} to the maximum of the values associated with that key in the
-   * input {@code PCollection}.
-   *
-   * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
-   */
-  public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
-    return Combine.<K, Long, Long>perKey(new MaxLongFn()).named("Max.PerKey");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<Double>} and returns a
-   * {@code PCollection<Double>} whose contents is the maximum of the input {@code PCollection}'s
-   * elements, or {@code Double.NEGATIVE_INFINITY} if there are no elements.
-   */
-  public static Combine.Globally<Double, Double> doublesGlobally() {
-    return Combine.globally(new MaxDoubleFn()).named("Max.Globally");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, Double>>} and returns
-   * a {@code PCollection<KV<K, Double>>} that contains an output element mapping each distinct key
-   * in the input {@code PCollection} to the maximum of the values associated with that key in the
-   * input {@code PCollection}.
-   *
-   * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
-   */
-  public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
-    return Combine.<K, Double, Double>perKey(new MaxDoubleFn()).named("Max.PerKey");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
-   * PCollection<T>} whose contents is the maximum according to the natural ordering of {@code T}
-   * of the input {@code PCollection}'s elements, or {@code null} if there are no elements.
-   */
-  public static <T extends Comparable<? super T>>
-  Combine.Globally<T, T> globally() {
-    return Combine.<T, T>globally(MaxFn.<T>naturalOrder()).named("Max.Globally");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, T>>} and returns a
-   * {@code PCollection<KV<K, T>>} that contains an output element mapping each distinct key in the
-   * input {@code PCollection} to the maximum according to the natural ordering of {@code T} of the
-   * values associated with that key in the input {@code PCollection}.
-   *
-   * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
-   */
-  public static <K, T extends Comparable<? super T>>
-  Combine.PerKey<K, T, T> perKey() {
-    return Combine.<K, T, T>perKey(MaxFn.<T>naturalOrder()).named("Max.PerKey");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
-   * PCollection<T>} whose contents is the maximum of the input {@code PCollection}'s elements, or
-   * {@code null} if there are no elements.
-   */
-  public static <T, ComparatorT extends Comparator<? super T> & Serializable>
-  Combine.Globally<T, T> globally(ComparatorT comparator) {
-    return Combine.<T, T>globally(MaxFn.of(comparator)).named("Max.Globally");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, T>>} and returns a
-   * {@code PCollection<KV<K, T>>} that contains one output element per key mapping each
-   * to the maximum of the values associated with that key in the input {@code PCollection}.
-   *
-   * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
-   */
-  public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
-  Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
-    return Combine.<K, T, T>perKey(MaxFn.of(comparator)).named("Max.PerKey");
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
-   * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
-   * {@link Combine#perKey}.
-   *
-   * @param <T> the type of the values being compared
-   */
-  public static class MaxFn<T> extends BinaryCombineFn<T> {
-
-    private final T identity;
-    private final Comparator<? super T> comparator;
-
-    private <ComparatorT extends Comparator<? super T> & Serializable> MaxFn(
-        T identity, ComparatorT comparator) {
-      this.identity = identity;
-      this.comparator = comparator;
-    }
-
-    public static <T, ComparatorT extends Comparator<? super T> & Serializable>
-    MaxFn<T> of(T identity, ComparatorT comparator) {
-      return new MaxFn<T>(identity, comparator);
-    }
-
-    public static <T, ComparatorT extends Comparator<? super T> & Serializable>
-    MaxFn<T> of(ComparatorT comparator) {
-      return new MaxFn<T>(null, comparator);
-    }
-
-    public static <T extends Comparable<? super T>> MaxFn<T> naturalOrder(T identity) {
-      return new MaxFn<T>(identity, new Top.Largest<T>());
-    }
-
-    public static <T extends Comparable<? super T>> MaxFn<T> naturalOrder() {
-      return new MaxFn<T>(null, new Top.Largest<T>());
-    }
-
-    @Override
-    public T identity() {
-      return identity;
-    }
-
-    @Override
-    public T apply(T left, T right) {
-      return comparator.compare(left, right) >= 0 ? left : right;
-    }
-  }
-
-  /**
-   * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MaxIntegerFn extends MaxFn<Integer> implements
-      CounterProvider<Integer> {
-    public MaxIntegerFn() {
-      super(Integer.MIN_VALUE, new Top.Largest<Integer>());
-    }
-
-    @Override
-    public Counter<Integer> getCounter(String name) {
-      return Counter.ints(name, AggregationKind.MAX);
-    }
-  }
-
-  /**
-   * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MaxLongFn extends MaxFn<Long> implements
-      CounterProvider<Long> {
-    public MaxLongFn() {
-      super(Long.MIN_VALUE, new Top.Largest<Long>());
-    }
-
-    @Override
-    public Counter<Long> getCounter(String name) {
-      return Counter.longs(name, AggregationKind.MAX);
-    }
-  }
-
-  /**
-   * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MaxDoubleFn extends MaxFn<Double> implements
-      CounterProvider<Double> {
-    public MaxDoubleFn() {
-      super(Double.NEGATIVE_INFINITY, new Top.Largest<Double>());
-    }
-
-    @Override
-    public Counter<Double> getCounter(String name) {
-      return Counter.doubles(name, AggregationKind.MAX);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Mean.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Mean.java
deleted file mode 100644
index 7dccfb6..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Mean.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.coders.DoubleCoder;
-import com.google.cloud.dataflow.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator;
-import com.google.common.base.MoreObjects;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Objects;
-
-/**
- * {@code PTransform}s for computing the arithmetic mean
- * (a.k.a. average) of the elements in a {@code PCollection}, or the
- * mean of the values associated with each key in a
- * {@code PCollection} of {@code KV}s.
- *
- * <p>Example 1: get the mean of a {@code PCollection} of {@code Long}s.
- * <pre> {@code
- * PCollection<Long> input = ...;
- * PCollection<Double> mean = input.apply(Mean.<Long>globally());
- * } </pre>
- *
- * <p>Example 2: calculate the mean of the {@code Integer}s
- * associated with each unique key (which is of type {@code String}).
- * <pre> {@code
- * PCollection<KV<String, Integer>> input = ...;
- * PCollection<KV<String, Double>> meanPerKey =
- *     input.apply(Mean.<String, Integer>perKey());
- * } </pre>
- */
-public class Mean {
-
-  private Mean() { } // Namespace only
-
-  /**
-   * Returns a {@code PTransform} that takes an input
-   * {@code PCollection<NumT>} and returns a
-   * {@code PCollection<Double>} whose contents is the mean of the
-   * input {@code PCollection}'s elements, or
-   * {@code 0} if there are no elements.
-   *
-   * @param <NumT> the type of the {@code Number}s being combined
-   */
-  public static <NumT extends Number> Combine.Globally<NumT, Double> globally() {
-    return Combine.<NumT, Double>globally(new MeanFn<>()).named("Mean.Globally");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input
-   * {@code PCollection<KV<K, N>>} and returns a
-   * {@code PCollection<KV<K, Double>>} that contains an output
-   * element mapping each distinct key in the input
-   * {@code PCollection} to the mean of the values associated with
-   * that key in the input {@code PCollection}.
-   *
-   * <p>See {@link Combine.PerKey} for how this affects timestamps and bucketing.
-   *
-   * @param <K> the type of the keys
-   * @param <NumT> the type of the {@code Number}s being combined
-   */
-  public static <K, NumT extends Number> Combine.PerKey<K, NumT, Double> perKey() {
-    return Combine.<K, NumT, Double>perKey(new MeanFn<>()).named("Mean.PerKey");
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * A {@code Combine.CombineFn} that computes the arithmetic mean
-   * (a.k.a. average) of an {@code Iterable} of numbers of type
-   * {@code N}, useful as an argument to {@link Combine#globally} or
-   * {@link Combine#perKey}.
-   *
-   * <p>Returns {@code Double.NaN} if combining zero elements.
-   *
-   * @param <NumT> the type of the {@code Number}s being combined
-   */
-  static class MeanFn<NumT extends Number>
-  extends Combine.AccumulatingCombineFn<NumT, CountSum<NumT>, Double> {
-    /**
-     * Constructs a combining function that computes the mean over
-     * a collection of values of type {@code N}.
-     */
-    public MeanFn() {}
-
-    @Override
-    public CountSum<NumT> createAccumulator() {
-      return new CountSum<>();
-    }
-
-    @Override
-    public Coder<CountSum<NumT>> getAccumulatorCoder(
-        CoderRegistry registry, Coder<NumT> inputCoder) {
-      return new CountSumCoder<>();
-    }
-  }
-
-  /**
-   * Accumulator class for {@link MeanFn}.
-   */
-  static class CountSum<NumT extends Number>
-  implements Accumulator<NumT, CountSum<NumT>, Double> {
-
-    long count = 0;
-    double sum = 0.0;
-
-    public CountSum() {
-      this(0, 0);
-    }
-
-    public CountSum(long count, double sum) {
-      this.count = count;
-      this.sum = sum;
-    }
-
-    @Override
-    public void addInput(NumT element) {
-      count++;
-      sum += element.doubleValue();
-    }
-
-    @Override
-    public void mergeAccumulator(CountSum<NumT> accumulator) {
-      count += accumulator.count;
-      sum += accumulator.sum;
-    }
-
-    @Override
-    public Double extractOutput() {
-      return count == 0 ? Double.NaN : sum / count;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (!(other instanceof CountSum)) {
-        return false;
-      }
-      @SuppressWarnings("unchecked")
-      CountSum<?> otherCountSum = (CountSum<?>) other;
-      return (count == otherCountSum.count)
-          && (sum == otherCountSum.sum);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(count, sum);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("count", count)
-          .add("sum", sum)
-          .toString();
-    }
-  }
-
-  static class CountSumCoder<NumT extends Number>
-  extends AtomicCoder<CountSum<NumT>> {
-     private static final Coder<Long> LONG_CODER = BigEndianLongCoder.of();
-     private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of();
-
-     @Override
-     public void encode(CountSum<NumT> value, OutputStream outStream, Coder.Context context)
-         throws CoderException, IOException {
-       Coder.Context nestedContext = context.nested();
-       LONG_CODER.encode(value.count, outStream, nestedContext);
-       DOUBLE_CODER.encode(value.sum, outStream, nestedContext);
-     }
-
-     @Override
-     public CountSum<NumT> decode(InputStream inStream, Coder.Context context)
-         throws CoderException, IOException {
-       Coder.Context nestedContext = context.nested();
-       return new CountSum<>(
-           LONG_CODER.decode(inStream, nestedContext),
-           DOUBLE_CODER.decode(inStream, nestedContext));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Min.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Min.java
deleted file mode 100644
index 47ab3a0..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Min.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine.BinaryCombineFn;
-import com.google.cloud.dataflow.sdk.util.common.Counter;
-import com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind;
-import com.google.cloud.dataflow.sdk.util.common.CounterProvider;
-
-import java.io.Serializable;
-import java.util.Comparator;
-
-/**
- * {@code PTransform}s for computing the minimum of the elements in a {@code PCollection}, or the
- * minimum of the values associated with each key in a {@code PCollection} of {@code KV}s.
- *
- * <p>Example 1: get the minimum of a {@code PCollection} of {@code Double}s.
- * <pre> {@code
- * PCollection<Double> input = ...;
- * PCollection<Double> min = input.apply(Min.doublesGlobally());
- * } </pre>
- *
- * <p>Example 2: calculate the minimum of the {@code Integer}s
- * associated with each unique key (which is of type {@code String}).
- * <pre> {@code
- * PCollection<KV<String, Integer>> input = ...;
- * PCollection<KV<String, Integer>> minPerKey = input
- *     .apply(Min.<String>integersPerKey());
- * } </pre>
- */
-public class Min {
-
-  private Min() {
-    // do not instantiate
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<Integer>} and returns a
-   * {@code PCollection<Integer>} whose contents is a single value that is the minimum of the input
-   * {@code PCollection}'s elements, or {@code Integer.MAX_VALUE} if there are no elements.
-   */
-  public static Combine.Globally<Integer, Integer> integersGlobally() {
-    return Combine.globally(new MinIntegerFn()).named("Min.Globally");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, Integer>>} and
-   * returns a {@code PCollection<KV<K, Integer>>} that contains an output element mapping each
-   * distinct key in the input {@code PCollection} to the minimum of the values associated with that
-   * key in the input {@code PCollection}.
-   *
-   * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
-   */
-  public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
-    return Combine.<K, Integer, Integer>perKey(new MinIntegerFn()).named("Min.PerKey");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<Long>} and returns a {@code
-   * PCollection<Long>} whose contents is the minimum of the input {@code PCollection}'s elements,
-   * or {@code Long.MAX_VALUE} if there are no elements.
-   */
-  public static Combine.Globally<Long, Long> longsGlobally() {
-    return Combine.globally(new MinLongFn()).named("Min.Globally");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, Long>>} and returns a
-   * {@code PCollection<KV<K, Long>>} that contains an output element mapping each distinct key in
-   * the input {@code PCollection} to the minimum of the values associated with that key in the
-   * input {@code PCollection}.
-   *
-   * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
-   */
-  public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
-   return Combine.<K, Long, Long>perKey(new MinLongFn()).named("Min.PerKey");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<Double>} and returns a
-   * {@code PCollection<Double>} whose contents is the minimum of the input {@code PCollection}'s
-   * elements, or {@code Double.POSITIVE_INFINITY} if there are no elements.
-   */
-  public static Combine.Globally<Double, Double> doublesGlobally() {
-    return Combine.globally(new MinDoubleFn()).named("Min.Globally");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, Double>>} and returns
-   * a {@code PCollection<KV<K, Double>>} that contains an output element mapping each distinct key
-   * in the input {@code PCollection} to the minimum of the values associated with that key in the
-   * input {@code PCollection}.
-   *
-   * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
-   */
-  public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
-    return Combine.<K, Double, Double>perKey(new MinDoubleFn()).named("Min.PerKey");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
-   * PCollection<T>} whose contents is the minimum according to the natural ordering of {@code T}
-   * of the input {@code PCollection}'s elements, or {@code null} if there are no elements.
-   */
-  public static <T extends Comparable<? super T>>
-  Combine.Globally<T, T> globally() {
-    return Combine.<T, T>globally(MinFn.<T>naturalOrder()).named("Min.Globally");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, T>>} and returns a
-   * {@code PCollection<KV<K, T>>} that contains an output element mapping each distinct key in the
-   * input {@code PCollection} to the minimum according to the natural ordering of {@code T} of the
-   * values associated with that key in the input {@code PCollection}.
-   *
-   * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
-   */
-  public static <K, T extends Comparable<? super T>>
-  Combine.PerKey<K, T, T> perKey() {
-    return Combine.<K, T, T>perKey(MinFn.<T>naturalOrder()).named("Min.PerKey");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
-   * PCollection<T>} whose contents is the minimum of the input {@code PCollection}'s elements, or
-   * {@code null} if there are no elements.
-   */
-  public static <T, ComparatorT extends Comparator<? super T> & Serializable>
-  Combine.Globally<T, T> globally(ComparatorT comparator) {
-    return Combine.<T, T>globally(MinFn.of(comparator)).named("Min.Globally");
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, T>>} and returns a
-   * {@code PCollection<KV<K, T>>} that contains one output element per key mapping each
-   * to the minimum of the values associated with that key in the input {@code PCollection}.
-   *
-   * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
-   */
-  public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
-  Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
-    return Combine.<K, T, T>perKey(MinFn.of(comparator)).named("Min.PerKey");
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
-   * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
-   * {@link Combine#perKey}.
-   *
-   * @param <T> the type of the values being compared
-   */
-  public static class MinFn<T> extends BinaryCombineFn<T> {
-
-    private final T identity;
-    private final Comparator<? super T> comparator;
-
-    private <ComparatorT extends Comparator<? super T> & Serializable> MinFn(
-        T identity, ComparatorT comparator) {
-      this.identity = identity;
-      this.comparator = comparator;
-    }
-
-    public static <T, ComparatorT extends Comparator<? super T> & Serializable>
-    MinFn<T> of(T identity, ComparatorT comparator) {
-      return new MinFn<T>(identity, comparator);
-    }
-
-    public static <T, ComparatorT extends Comparator<? super T> & Serializable>
-    MinFn<T> of(ComparatorT comparator) {
-      return new MinFn<T>(null, comparator);
-    }
-
-    public static <T extends Comparable<? super T>> MinFn<T> naturalOrder(T identity) {
-      return new MinFn<T>(identity, new Top.Largest<T>());
-    }
-
-    public static <T extends Comparable<? super T>> MinFn<T> naturalOrder() {
-      return new MinFn<T>(null, new Top.Largest<T>());
-    }
-
-    @Override
-    public T identity() {
-      return identity;
-    }
-
-    @Override
-    public T apply(T left, T right) {
-      return comparator.compare(left, right) <= 0 ? left : right;
-    }
-  }
-
-  /**
-   * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MinIntegerFn extends MinFn<Integer> implements
-      CounterProvider<Integer> {
-    public MinIntegerFn() {
-      super(Integer.MAX_VALUE, new Top.Largest<Integer>());
-    }
-
-    @Override
-    public Counter<Integer> getCounter(String name) {
-      return Counter.ints(name, AggregationKind.MIN);
-    }
-  }
-
-  /**
-   * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MinLongFn extends MinFn<Long> implements
-      CounterProvider<Long> {
-    public MinLongFn() {
-      super(Long.MAX_VALUE, new Top.Largest<Long>());
-    }
-
-    @Override
-    public Counter<Long> getCounter(String name) {
-      return Counter.longs(name, AggregationKind.MIN);
-    }
-  }
-
-  /**
-   * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MinDoubleFn extends MinFn<Double> implements
-      CounterProvider<Double> {
-    public MinDoubleFn() {
-      super(Double.POSITIVE_INFINITY, new Top.Largest<Double>());
-    }
-
-    @Override
-    public Counter<Double> getCounter(String name) {
-      return Counter.doubles(name, AggregationKind.MIN);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java
deleted file mode 100644
index d4496b8..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.display.DisplayData.Builder;
-import com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData;
-import com.google.cloud.dataflow.sdk.util.StringUtils;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.TypedPValue;
-
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-/**
- * A {@code PTransform<InputT, OutputT>} is an operation that takes an
- * {@code InputT} (some subtype of {@link PInput}) and produces an
- * {@code OutputT} (some subtype of {@link POutput}).
- *
- * <p>Common PTransforms include root PTransforms like
- * {@link com.google.cloud.dataflow.sdk.io.TextIO.Read},
- * {@link Create}, processing and
- * conversion operations like {@link ParDo},
- * {@link GroupByKey},
- * {@link com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey},
- * {@link Combine}, and {@link Count}, and outputting
- * PTransforms like
- * {@link com.google.cloud.dataflow.sdk.io.TextIO.Write}.  Users also
- * define their own application-specific composite PTransforms.
- *
- * <p>Each {@code PTransform<InputT, OutputT>} has a single
- * {@code InputT} type and a single {@code OutputT} type.  Many
- * PTransforms conceptually transform one input value to one output
- * value, and in this case {@code InputT} and {@code Output} are
- * typically instances of
- * {@link com.google.cloud.dataflow.sdk.values.PCollection}.
- * A root
- * PTransform conceptually has no input; in this case, conventionally
- * a {@link com.google.cloud.dataflow.sdk.values.PBegin} object
- * produced by calling {@link Pipeline#begin} is used as the input.
- * An outputting PTransform conceptually has no output; in this case,
- * conventionally {@link com.google.cloud.dataflow.sdk.values.PDone}
- * is used as its output type.  Some PTransforms conceptually have
- * multiple inputs and/or outputs; in these cases special "bundling"
- * classes like
- * {@link com.google.cloud.dataflow.sdk.values.PCollectionList},
- * {@link com.google.cloud.dataflow.sdk.values.PCollectionTuple}
- * are used
- * to combine multiple values into a single bundle for passing into or
- * returning from the PTransform.
- *
- * <p>A {@code PTransform<InputT, OutputT>} is invoked by calling
- * {@code apply()} on its {@code InputT}, returning its {@code OutputT}.
- * Calls can be chained to concisely create linear pipeline segments.
- * For example:
- *
- * <pre> {@code
- * PCollection<T1> pc1 = ...;
- * PCollection<T2> pc2 =
- *     pc1.apply(ParDo.of(new MyDoFn<T1,KV<K,V>>()))
- *        .apply(GroupByKey.<K, V>create())
- *        .apply(Combine.perKey(new MyKeyedCombineFn<K,V>()))
- *        .apply(ParDo.of(new MyDoFn2<KV<K,V>,T2>()));
- * } </pre>
- *
- * <p>PTransform operations have unique names, which are used by the
- * system when explaining what's going on during optimization and
- * execution.  Each PTransform gets a system-provided default name,
- * but it's a good practice to specify an explicit name, where
- * possible, using the {@code named()} method offered by some
- * PTransforms such as {@link ParDo}.  For example:
- *
- * <pre> {@code
- * ...
- * .apply(ParDo.named("Step1").of(new MyDoFn3()))
- * ...
- * } </pre>
- *
- * <p>Each PCollection output produced by a PTransform,
- * either directly or within a "bundling" class, automatically gets
- * its own name derived from the name of its producing PTransform.
- *
- * <p>Each PCollection output produced by a PTransform
- * also records a {@link com.google.cloud.dataflow.sdk.coders.Coder}
- * that specifies how the elements of that PCollection
- * are to be encoded as a byte string, if necessary.  The
- * PTransform may provide a default Coder for any of its outputs, for
- * instance by deriving it from the PTransform input's Coder.  If the
- * PTransform does not specify the Coder for an output PCollection,
- * the system will attempt to infer a Coder for it, based on
- * what's known at run-time about the Java type of the output's
- * elements.  The enclosing {@link Pipeline}'s
- * {@link com.google.cloud.dataflow.sdk.coders.CoderRegistry}
- * (accessible via {@link Pipeline#getCoderRegistry}) defines the
- * mapping from Java types to the default Coder to use, for a standard
- * set of Java types; users can extend this mapping for additional
- * types, via
- * {@link com.google.cloud.dataflow.sdk.coders.CoderRegistry#registerCoder}.
- * If this inference process fails, either because the Java type was
- * not known at run-time (e.g., due to Java's "erasure" of generic
- * types) or there was no default Coder registered, then the Coder
- * should be specified manually by calling
- * {@link com.google.cloud.dataflow.sdk.values.TypedPValue#setCoder}
- * on the output PCollection.  The Coder of every output
- * PCollection must be determined one way or another
- * before that output is used as an input to another PTransform, or
- * before the enclosing Pipeline is run.
- *
- * <p>A small number of PTransforms are implemented natively by the
- * Google Cloud Dataflow SDK; such PTransforms simply return an
- * output value as their apply implementation.
- * The majority of PTransforms are
- * implemented as composites of other PTransforms.  Such a PTransform
- * subclass typically just implements {@link #apply}, computing its
- * Output value from its {@code InputT} value.  User programs are encouraged to
- * use this mechanism to modularize their own code.  Such composite
- * abstractions get their own name, and navigating through the
- * composition hierarchy of PTransforms is supported by the monitoring
- * interface.  Examples of composite PTransforms can be found in this
- * directory and in examples.  From the caller's point of view, there
- * is no distinction between a PTransform implemented natively and one
- * implemented in terms of other PTransforms; both kinds of PTransform
- * are invoked in the same way, using {@code apply()}.
- *
- * <h3>Note on Serialization</h3>
- *
- * <p>{@code PTransform} doesn't actually support serialization, despite
- * implementing {@code Serializable}.
- *
- * <p>{@code PTransform} is marked {@code Serializable} solely
- * because it is common for an anonymous {@code DoFn},
- * instance to be created within an
- * {@code apply()} method of a composite {@code PTransform}.
- *
- * <p>Each of those {@code *Fn}s is {@code Serializable}, but
- * unfortunately its instance state will contain a reference to the
- * enclosing {@code PTransform} instance, and so attempt to serialize
- * the {@code PTransform} instance, even though the {@code *Fn}
- * instance never references anything about the enclosing
- * {@code PTransform}.
- *
- * <p>To allow such anonymous {@code *Fn}s to be written
- * conveniently, {@code PTransform} is marked as {@code Serializable},
- * and includes dummy {@code writeObject()} and {@code readObject()}
- * operations that do not save or restore any state.
- *
- * @see <a href=
- * "https://cloud.google.com/dataflow/java-sdk/applying-transforms"
- * >Applying Transformations</a>
- *
- * @param <InputT> the type of the input to this PTransform
- * @param <OutputT> the type of the output of this PTransform
- */
-public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
-    implements Serializable /* See the note above */, HasDisplayData {
-  /**
-   * Applies this {@code PTransform} on the given {@code InputT}, and returns its
-   * {@code Output}.
-   *
-   * <p>Composite transforms, which are defined in terms of other transforms,
-   * should return the output of one of the composed transforms.  Non-composite
-   * transforms, which do not apply any transforms internally, should return
-   * a new unbound output and register evaluators (via backend-specific
-   * registration methods).
-   *
-   * <p>The default implementation throws an exception.  A derived class must
-   * either implement apply, or else each runner must supply a custom
-   * implementation via
-   * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner#apply}.
-   */
-  public OutputT apply(InputT input) {
-    throw new IllegalArgumentException(
-        "Runner " + input.getPipeline().getRunner()
-            + " has not registered an implementation for the required primitive operation "
-            + this);
-  }
-
-  /**
-   * Called before invoking apply (which may be intercepted by the runner) to
-   * verify this transform is fully specified and applicable to the specified
-   * input.
-   *
-   * <p>By default, does nothing.
-   */
-  public void validate(InputT input) { }
-
-  /**
-   * Returns the transform name.
-   *
-   * <p>This name is provided by the transform creator and is not required to be unique.
-   */
-  public String getName() {
-    return name != null ? name : getKindString();
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  // See the note about about PTransform's fake Serializability, to
-  // understand why all of its instance state is transient.
-
-  /**
-   * The base name of this {@code PTransform}, e.g., from
-   * {@link ParDo#named(String)}, or from defaults, or {@code null} if not
-   * yet assigned.
-   */
-  protected final transient String name;
-
-  protected PTransform() {
-    this.name = null;
-  }
-
-  protected PTransform(String name) {
-    this.name = name;
-  }
-
-  @Override
-  public String toString() {
-    if (name == null) {
-      return getKindString();
-    } else {
-      return getName() + " [" + getKindString() + "]";
-    }
-  }
-
-  /**
-   * Returns the name to use by default for this {@code PTransform}
-   * (not including the names of any enclosing {@code PTransform}s).
-   *
-   * <p>By default, returns the base name of this {@code PTransform}'s class.
-   *
-   * <p>The caller is responsible for ensuring that names of applied
-   * {@code PTransform}s are unique, e.g., by adding a uniquifying
-   * suffix when needed.
-   */
-  protected String getKindString() {
-    if (getClass().isAnonymousClass()) {
-      return "AnonymousTransform";
-    } else {
-      return StringUtils.approximatePTransformName(getClass());
-    }
-  }
-
-  private void writeObject(ObjectOutputStream oos) {
-    // We don't really want to be serializing this object, but we
-    // often have serializable anonymous DoFns nested within a
-    // PTransform.
-  }
-
-  private void readObject(ObjectInputStream oos) {
-    // We don't really want to be serializing this object, but we
-    // often have serializable anonymous DoFns nested within a
-    // PTransform.
-  }
-
-  /**
-   * Returns the default {@code Coder} to use for the output of this
-   * single-output {@code PTransform}.
-   *
-   * <p>By default, always throws
-   *
-   * @throws CannotProvideCoderException if no coder can be inferred
-   */
-  protected Coder<?> getDefaultOutputCoder() throws CannotProvideCoderException {
-    throw new CannotProvideCoderException(
-      "PTransform.getDefaultOutputCoder called.");
-  }
-
-  /**
-   * Returns the default {@code Coder} to use for the output of this
-   * single-output {@code PTransform} when applied to the given input.
-   *
-   * @throws CannotProvideCoderException if none can be inferred.
-   *
-   * <p>By default, always throws.
-   */
-  protected Coder<?> getDefaultOutputCoder(@SuppressWarnings("unused") InputT input)
-      throws CannotProvideCoderException {
-    return getDefaultOutputCoder();
-  }
-
-  /**
-   * Returns the default {@code Coder} to use for the given output of
-   * this single-output {@code PTransform} when applied to the given input.
-   *
-   * @throws CannotProvideCoderException if none can be inferred.
-   *
-   * <p>By default, always throws.
-   */
-  public <T> Coder<T> getDefaultOutputCoder(
-      InputT input, @SuppressWarnings("unused") TypedPValue<T> output)
-      throws CannotProvideCoderException {
-    @SuppressWarnings("unchecked")
-    Coder<T> defaultOutputCoder = (Coder<T>) getDefaultOutputCoder(input);
-    return defaultOutputCoder;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>By default, does not register any display data. Implementors may override this method
-   * to provide their own display metadata.
-   */
-  @Override
-  public void populateDisplayData(Builder builder) {
-  }
-}