You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:45 UTC
[21/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/MapElements.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/MapElements.java
deleted file mode 100644
index 8997050..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/MapElements.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-/**
- * {@code PTransform}s for mapping a simple function over the elements of a {@link PCollection}.
- */
-public class MapElements<InputT, OutputT>
-extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
-
- /**
- * For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output type descriptor,
- * returns a {@code PTransform} that takes an input {@code PCollection<InputT>} and returns
- * a {@code PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in
- * the input.
- *
- * <p>Example of use in Java 8:
- * <pre>{@code
- * PCollection<Integer> wordLengths = words.apply(
- * MapElements.via((String word) -> word.length())
- * .withOutputType(new TypeDescriptor<Integer>() {});
- * }</pre>
- *
- * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
- * descriptor need not be provided.
- */
- public static <InputT, OutputT> MissingOutputTypeDescriptor<InputT, OutputT>
- via(SerializableFunction<InputT, OutputT> fn) {
- return new MissingOutputTypeDescriptor<>(fn);
- }
-
- /**
- * For a {@code SimpleFunction<InputT, OutputT>} {@code fn}, returns a {@code PTransform} that
- * takes an input {@code PCollection<InputT>} and returns a {@code PCollection<OutputT>}
- * containing {@code fn.apply(v)} for every element {@code v} in the input.
- *
- * <p>This overload is intended primarily for use in Java 7. In Java 8, the overload
- * {@link #via(SerializableFunction)} supports use of lambda for greater concision.
- *
- * <p>Example of use in Java 7:
- * <pre>{@code
- * PCollection<String> words = ...;
- * PCollection<Integer> wordsPerLine = words.apply(MapElements.via(
- * new SimpleFunction<String, Integer>() {
- * public Integer apply(String word) {
- * return word.length();
- * }
- * }));
- * }</pre>
- */
- public static <InputT, OutputT> MapElements<InputT, OutputT>
- via(final SimpleFunction<InputT, OutputT> fn) {
- return new MapElements<>(fn, fn.getOutputTypeDescriptor());
- }
-
- /**
- * An intermediate builder for a {@link MapElements} transform. To complete the transform, provide
- * an output type descriptor to {@link MissingOutputTypeDescriptor#withOutputType}. See
- * {@link #via(SerializableFunction)} for a full example of use.
- */
- public static final class MissingOutputTypeDescriptor<InputT, OutputT> {
-
- private final SerializableFunction<InputT, OutputT> fn;
-
- private MissingOutputTypeDescriptor(SerializableFunction<InputT, OutputT> fn) {
- this.fn = fn;
- }
-
- public MapElements<InputT, OutputT> withOutputType(TypeDescriptor<OutputT> outputType) {
- return new MapElements<>(fn, outputType);
- }
- }
-
- ///////////////////////////////////////////////////////////////////
-
- private final SerializableFunction<InputT, OutputT> fn;
- private final transient TypeDescriptor<OutputT> outputType;
-
- private MapElements(
- SerializableFunction<InputT, OutputT> fn,
- TypeDescriptor<OutputT> outputType) {
- this.fn = fn;
- this.outputType = outputType;
- }
-
- @Override
- public PCollection<OutputT> apply(PCollection<InputT> input) {
- return input.apply(ParDo.named("Map").of(new DoFn<InputT, OutputT>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(fn.apply(c.element()));
- }
- })).setTypeDescriptorInternal(outputType);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Max.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Max.java
deleted file mode 100644
index 8678e4f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Max.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine.BinaryCombineFn;
-import com.google.cloud.dataflow.sdk.util.common.Counter;
-import com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind;
-import com.google.cloud.dataflow.sdk.util.common.CounterProvider;
-
-import java.io.Serializable;
-import java.util.Comparator;
-
-/**
- * {@code PTransform}s for computing the maximum of the elements in a {@code PCollection}, or the
- * maximum of the values associated with each key in a {@code PCollection} of {@code KV}s.
- *
- * <p>Example 1: get the maximum of a {@code PCollection} of {@code Double}s.
- * <pre> {@code
- * PCollection<Double> input = ...;
- * PCollection<Double> max = input.apply(Max.doublesGlobally());
- * } </pre>
- *
- * <p>Example 2: calculate the maximum of the {@code Integer}s
- * associated with each unique key (which is of type {@code String}).
- * <pre> {@code
- * PCollection<KV<String, Integer>> input = ...;
- * PCollection<KV<String, Integer>> maxPerKey = input
- * .apply(Max.<String>integersPerKey());
- * } </pre>
- */
-public class Max {
-
- private Max() {
- // do not instantiate
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<Integer>} and returns a
- * {@code PCollection<Integer>} whose contents is the maximum of the input {@code PCollection}'s
- * elements, or {@code Integer.MIN_VALUE} if there are no elements.
- */
- public static Combine.Globally<Integer, Integer> integersGlobally() {
- return Combine.globally(new MaxIntegerFn()).named("Max.Globally");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, Integer>>} and
- * returns a {@code PCollection<KV<K, Integer>>} that contains an output element mapping each
- * distinct key in the input {@code PCollection} to the maximum of the values associated with that
- * key in the input {@code PCollection}.
- *
- * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
- */
- public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
- return Combine.<K, Integer, Integer>perKey(new MaxIntegerFn()).named("Max.PerKey");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<Long>} and returns a {@code
- * PCollection<Long>} whose contents is the maximum of the input {@code PCollection}'s elements,
- * or {@code Long.MIN_VALUE} if there are no elements.
- */
- public static Combine.Globally<Long, Long> longsGlobally() {
- return Combine.globally(new MaxLongFn()).named("Max.Globally");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, Long>>} and returns a
- * {@code PCollection<KV<K, Long>>} that contains an output element mapping each distinct key in
- * the input {@code PCollection} to the maximum of the values associated with that key in the
- * input {@code PCollection}.
- *
- * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
- */
- public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
- return Combine.<K, Long, Long>perKey(new MaxLongFn()).named("Max.PerKey");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<Double>} and returns a
- * {@code PCollection<Double>} whose contents is the maximum of the input {@code PCollection}'s
- * elements, or {@code Double.NEGATIVE_INFINITY} if there are no elements.
- */
- public static Combine.Globally<Double, Double> doublesGlobally() {
- return Combine.globally(new MaxDoubleFn()).named("Max.Globally");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, Double>>} and returns
- * a {@code PCollection<KV<K, Double>>} that contains an output element mapping each distinct key
- * in the input {@code PCollection} to the maximum of the values associated with that key in the
- * input {@code PCollection}.
- *
- * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
- */
- public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
- return Combine.<K, Double, Double>perKey(new MaxDoubleFn()).named("Max.PerKey");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
- * PCollection<T>} whose contents is the maximum according to the natural ordering of {@code T}
- * of the input {@code PCollection}'s elements, or {@code null} if there are no elements.
- */
- public static <T extends Comparable<? super T>>
- Combine.Globally<T, T> globally() {
- return Combine.<T, T>globally(MaxFn.<T>naturalOrder()).named("Max.Globally");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, T>>} and returns a
- * {@code PCollection<KV<K, T>>} that contains an output element mapping each distinct key in the
- * input {@code PCollection} to the maximum according to the natural ordering of {@code T} of the
- * values associated with that key in the input {@code PCollection}.
- *
- * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
- */
- public static <K, T extends Comparable<? super T>>
- Combine.PerKey<K, T, T> perKey() {
- return Combine.<K, T, T>perKey(MaxFn.<T>naturalOrder()).named("Max.PerKey");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
- * PCollection<T>} whose contents is the maximum of the input {@code PCollection}'s elements, or
- * {@code null} if there are no elements.
- */
- public static <T, ComparatorT extends Comparator<? super T> & Serializable>
- Combine.Globally<T, T> globally(ComparatorT comparator) {
- return Combine.<T, T>globally(MaxFn.of(comparator)).named("Max.Globally");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, T>>} and returns a
- * {@code PCollection<KV<K, T>>} that contains one output element per key mapping each
- * to the maximum of the values associated with that key in the input {@code PCollection}.
- *
- * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
- */
- public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
- Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
- return Combine.<K, T, T>perKey(MaxFn.of(comparator)).named("Max.PerKey");
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
- * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
- * {@link Combine#perKey}.
- *
- * @param <T> the type of the values being compared
- */
- public static class MaxFn<T> extends BinaryCombineFn<T> {
-
- private final T identity;
- private final Comparator<? super T> comparator;
-
- private <ComparatorT extends Comparator<? super T> & Serializable> MaxFn(
- T identity, ComparatorT comparator) {
- this.identity = identity;
- this.comparator = comparator;
- }
-
- public static <T, ComparatorT extends Comparator<? super T> & Serializable>
- MaxFn<T> of(T identity, ComparatorT comparator) {
- return new MaxFn<T>(identity, comparator);
- }
-
- public static <T, ComparatorT extends Comparator<? super T> & Serializable>
- MaxFn<T> of(ComparatorT comparator) {
- return new MaxFn<T>(null, comparator);
- }
-
- public static <T extends Comparable<? super T>> MaxFn<T> naturalOrder(T identity) {
- return new MaxFn<T>(identity, new Top.Largest<T>());
- }
-
- public static <T extends Comparable<? super T>> MaxFn<T> naturalOrder() {
- return new MaxFn<T>(null, new Top.Largest<T>());
- }
-
- @Override
- public T identity() {
- return identity;
- }
-
- @Override
- public T apply(T left, T right) {
- return comparator.compare(left, right) >= 0 ? left : right;
- }
- }
-
- /**
- * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MaxIntegerFn extends MaxFn<Integer> implements
- CounterProvider<Integer> {
- public MaxIntegerFn() {
- super(Integer.MIN_VALUE, new Top.Largest<Integer>());
- }
-
- @Override
- public Counter<Integer> getCounter(String name) {
- return Counter.ints(name, AggregationKind.MAX);
- }
- }
-
- /**
- * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MaxLongFn extends MaxFn<Long> implements
- CounterProvider<Long> {
- public MaxLongFn() {
- super(Long.MIN_VALUE, new Top.Largest<Long>());
- }
-
- @Override
- public Counter<Long> getCounter(String name) {
- return Counter.longs(name, AggregationKind.MAX);
- }
- }
-
- /**
- * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MaxDoubleFn extends MaxFn<Double> implements
- CounterProvider<Double> {
- public MaxDoubleFn() {
- super(Double.NEGATIVE_INFINITY, new Top.Largest<Double>());
- }
-
- @Override
- public Counter<Double> getCounter(String name) {
- return Counter.doubles(name, AggregationKind.MAX);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Mean.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Mean.java
deleted file mode 100644
index 7dccfb6..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Mean.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.coders.DoubleCoder;
-import com.google.cloud.dataflow.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator;
-import com.google.common.base.MoreObjects;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Objects;
-
-/**
- * {@code PTransform}s for computing the arithmetic mean
- * (a.k.a. average) of the elements in a {@code PCollection}, or the
- * mean of the values associated with each key in a
- * {@code PCollection} of {@code KV}s.
- *
- * <p>Example 1: get the mean of a {@code PCollection} of {@code Long}s.
- * <pre> {@code
- * PCollection<Long> input = ...;
- * PCollection<Double> mean = input.apply(Mean.<Long>globally());
- * } </pre>
- *
- * <p>Example 2: calculate the mean of the {@code Integer}s
- * associated with each unique key (which is of type {@code String}).
- * <pre> {@code
- * PCollection<KV<String, Integer>> input = ...;
- * PCollection<KV<String, Double>> meanPerKey =
- * input.apply(Mean.<String, Integer>perKey());
- * } </pre>
- */
-public class Mean {
-
- private Mean() { } // Namespace only
-
- /**
- * Returns a {@code PTransform} that takes an input
- * {@code PCollection<NumT>} and returns a
- * {@code PCollection<Double>} whose contents is the mean of the
- * input {@code PCollection}'s elements, or
- * {@code 0} if there are no elements.
- *
- * @param <NumT> the type of the {@code Number}s being combined
- */
- public static <NumT extends Number> Combine.Globally<NumT, Double> globally() {
- return Combine.<NumT, Double>globally(new MeanFn<>()).named("Mean.Globally");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input
- * {@code PCollection<KV<K, N>>} and returns a
- * {@code PCollection<KV<K, Double>>} that contains an output
- * element mapping each distinct key in the input
- * {@code PCollection} to the mean of the values associated with
- * that key in the input {@code PCollection}.
- *
- * <p>See {@link Combine.PerKey} for how this affects timestamps and bucketing.
- *
- * @param <K> the type of the keys
- * @param <NumT> the type of the {@code Number}s being combined
- */
- public static <K, NumT extends Number> Combine.PerKey<K, NumT, Double> perKey() {
- return Combine.<K, NumT, Double>perKey(new MeanFn<>()).named("Mean.PerKey");
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * A {@code Combine.CombineFn} that computes the arithmetic mean
- * (a.k.a. average) of an {@code Iterable} of numbers of type
- * {@code N}, useful as an argument to {@link Combine#globally} or
- * {@link Combine#perKey}.
- *
- * <p>Returns {@code Double.NaN} if combining zero elements.
- *
- * @param <NumT> the type of the {@code Number}s being combined
- */
- static class MeanFn<NumT extends Number>
- extends Combine.AccumulatingCombineFn<NumT, CountSum<NumT>, Double> {
- /**
- * Constructs a combining function that computes the mean over
- * a collection of values of type {@code N}.
- */
- public MeanFn() {}
-
- @Override
- public CountSum<NumT> createAccumulator() {
- return new CountSum<>();
- }
-
- @Override
- public Coder<CountSum<NumT>> getAccumulatorCoder(
- CoderRegistry registry, Coder<NumT> inputCoder) {
- return new CountSumCoder<>();
- }
- }
-
- /**
- * Accumulator class for {@link MeanFn}.
- */
- static class CountSum<NumT extends Number>
- implements Accumulator<NumT, CountSum<NumT>, Double> {
-
- long count = 0;
- double sum = 0.0;
-
- public CountSum() {
- this(0, 0);
- }
-
- public CountSum(long count, double sum) {
- this.count = count;
- this.sum = sum;
- }
-
- @Override
- public void addInput(NumT element) {
- count++;
- sum += element.doubleValue();
- }
-
- @Override
- public void mergeAccumulator(CountSum<NumT> accumulator) {
- count += accumulator.count;
- sum += accumulator.sum;
- }
-
- @Override
- public Double extractOutput() {
- return count == 0 ? Double.NaN : sum / count;
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof CountSum)) {
- return false;
- }
- @SuppressWarnings("unchecked")
- CountSum<?> otherCountSum = (CountSum<?>) other;
- return (count == otherCountSum.count)
- && (sum == otherCountSum.sum);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(count, sum);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("count", count)
- .add("sum", sum)
- .toString();
- }
- }
-
- static class CountSumCoder<NumT extends Number>
- extends AtomicCoder<CountSum<NumT>> {
- private static final Coder<Long> LONG_CODER = BigEndianLongCoder.of();
- private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of();
-
- @Override
- public void encode(CountSum<NumT> value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- Coder.Context nestedContext = context.nested();
- LONG_CODER.encode(value.count, outStream, nestedContext);
- DOUBLE_CODER.encode(value.sum, outStream, nestedContext);
- }
-
- @Override
- public CountSum<NumT> decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- Coder.Context nestedContext = context.nested();
- return new CountSum<>(
- LONG_CODER.decode(inStream, nestedContext),
- DOUBLE_CODER.decode(inStream, nestedContext));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Min.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Min.java
deleted file mode 100644
index 47ab3a0..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Min.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine.BinaryCombineFn;
-import com.google.cloud.dataflow.sdk.util.common.Counter;
-import com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind;
-import com.google.cloud.dataflow.sdk.util.common.CounterProvider;
-
-import java.io.Serializable;
-import java.util.Comparator;
-
-/**
- * {@code PTransform}s for computing the minimum of the elements in a {@code PCollection}, or the
- * minimum of the values associated with each key in a {@code PCollection} of {@code KV}s.
- *
- * <p>Example 1: get the minimum of a {@code PCollection} of {@code Double}s.
- * <pre> {@code
- * PCollection<Double> input = ...;
- * PCollection<Double> min = input.apply(Min.doublesGlobally());
- * } </pre>
- *
- * <p>Example 2: calculate the minimum of the {@code Integer}s
- * associated with each unique key (which is of type {@code String}).
- * <pre> {@code
- * PCollection<KV<String, Integer>> input = ...;
- * PCollection<KV<String, Integer>> minPerKey = input
- * .apply(Min.<String>integersPerKey());
- * } </pre>
- */
-public class Min {
-
- private Min() {
- // do not instantiate
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<Integer>} and returns a
- * {@code PCollection<Integer>} whose contents is a single value that is the minimum of the input
- * {@code PCollection}'s elements, or {@code Integer.MAX_VALUE} if there are no elements.
- */
- public static Combine.Globally<Integer, Integer> integersGlobally() {
- return Combine.globally(new MinIntegerFn()).named("Min.Globally");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, Integer>>} and
- * returns a {@code PCollection<KV<K, Integer>>} that contains an output element mapping each
- * distinct key in the input {@code PCollection} to the minimum of the values associated with that
- * key in the input {@code PCollection}.
- *
- * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
- */
- public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
- return Combine.<K, Integer, Integer>perKey(new MinIntegerFn()).named("Min.PerKey");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<Long>} and returns a {@code
- * PCollection<Long>} whose contents is the minimum of the input {@code PCollection}'s elements,
- * or {@code Long.MAX_VALUE} if there are no elements.
- */
- public static Combine.Globally<Long, Long> longsGlobally() {
- return Combine.globally(new MinLongFn()).named("Min.Globally");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, Long>>} and returns a
- * {@code PCollection<KV<K, Long>>} that contains an output element mapping each distinct key in
- * the input {@code PCollection} to the minimum of the values associated with that key in the
- * input {@code PCollection}.
- *
- * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
- */
- public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
- return Combine.<K, Long, Long>perKey(new MinLongFn()).named("Min.PerKey");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<Double>} and returns a
- * {@code PCollection<Double>} whose contents is the minimum of the input {@code PCollection}'s
- * elements, or {@code Double.POSITIVE_INFINITY} if there are no elements.
- */
- public static Combine.Globally<Double, Double> doublesGlobally() {
- return Combine.globally(new MinDoubleFn()).named("Min.Globally");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, Double>>} and returns
- * a {@code PCollection<KV<K, Double>>} that contains an output element mapping each distinct key
- * in the input {@code PCollection} to the minimum of the values associated with that key in the
- * input {@code PCollection}.
- *
- * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
- */
- public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
- return Combine.<K, Double, Double>perKey(new MinDoubleFn()).named("Min.PerKey");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
- * PCollection<T>} whose contents is the minimum according to the natural ordering of {@code T}
- * of the input {@code PCollection}'s elements, or {@code null} if there are no elements.
- */
- public static <T extends Comparable<? super T>>
- Combine.Globally<T, T> globally() {
- return Combine.<T, T>globally(MinFn.<T>naturalOrder()).named("Min.Globally");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, T>>} and returns a
- * {@code PCollection<KV<K, T>>} that contains an output element mapping each distinct key in the
- * input {@code PCollection} to the minimum according to the natural ordering of {@code T} of the
- * values associated with that key in the input {@code PCollection}.
- *
- * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
- */
- public static <K, T extends Comparable<? super T>>
- Combine.PerKey<K, T, T> perKey() {
- return Combine.<K, T, T>perKey(MinFn.<T>naturalOrder()).named("Min.PerKey");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
- * PCollection<T>} whose contents is the minimum of the input {@code PCollection}'s elements, or
- * {@code null} if there are no elements.
- */
- public static <T, ComparatorT extends Comparator<? super T> & Serializable>
- Combine.Globally<T, T> globally(ComparatorT comparator) {
- return Combine.<T, T>globally(MinFn.of(comparator)).named("Min.Globally");
- }
-
- /**
- * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, T>>} and returns a
- * {@code PCollection<KV<K, T>>} that contains one output element per key mapping each
- * to the minimum of the values associated with that key in the input {@code PCollection}.
- *
- * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
- */
- public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
- Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
- return Combine.<K, T, T>perKey(MinFn.of(comparator)).named("Min.PerKey");
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
- * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
- * {@link Combine#perKey}.
- *
- * @param <T> the type of the values being compared
- */
- public static class MinFn<T> extends BinaryCombineFn<T> {
-
- private final T identity;
- private final Comparator<? super T> comparator;
-
- private <ComparatorT extends Comparator<? super T> & Serializable> MinFn(
- T identity, ComparatorT comparator) {
- this.identity = identity;
- this.comparator = comparator;
- }
-
- public static <T, ComparatorT extends Comparator<? super T> & Serializable>
- MinFn<T> of(T identity, ComparatorT comparator) {
- return new MinFn<T>(identity, comparator);
- }
-
- public static <T, ComparatorT extends Comparator<? super T> & Serializable>
- MinFn<T> of(ComparatorT comparator) {
- return new MinFn<T>(null, comparator);
- }
-
- public static <T extends Comparable<? super T>> MinFn<T> naturalOrder(T identity) {
- return new MinFn<T>(identity, new Top.Largest<T>());
- }
-
- public static <T extends Comparable<? super T>> MinFn<T> naturalOrder() {
- return new MinFn<T>(null, new Top.Largest<T>());
- }
-
- @Override
- public T identity() {
- return identity;
- }
-
- @Override
- public T apply(T left, T right) {
- return comparator.compare(left, right) <= 0 ? left : right;
- }
- }
-
- /**
- * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MinIntegerFn extends MinFn<Integer> implements
- CounterProvider<Integer> {
- public MinIntegerFn() {
- super(Integer.MAX_VALUE, new Top.Largest<Integer>());
- }
-
- @Override
- public Counter<Integer> getCounter(String name) {
- return Counter.ints(name, AggregationKind.MIN);
- }
- }
-
- /**
- * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MinLongFn extends MinFn<Long> implements
- CounterProvider<Long> {
- public MinLongFn() {
- super(Long.MAX_VALUE, new Top.Largest<Long>());
- }
-
- @Override
- public Counter<Long> getCounter(String name) {
- return Counter.longs(name, AggregationKind.MIN);
- }
- }
-
- /**
- * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MinDoubleFn extends MinFn<Double> implements
- CounterProvider<Double> {
- public MinDoubleFn() {
- super(Double.POSITIVE_INFINITY, new Top.Largest<Double>());
- }
-
- @Override
- public Counter<Double> getCounter(String name) {
- return Counter.doubles(name, AggregationKind.MIN);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java
deleted file mode 100644
index d4496b8..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.display.DisplayData.Builder;
-import com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData;
-import com.google.cloud.dataflow.sdk.util.StringUtils;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.TypedPValue;
-
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-/**
- * A {@code PTransform<InputT, OutputT>} is an operation that takes an
- * {@code InputT} (some subtype of {@link PInput}) and produces an
- * {@code OutputT} (some subtype of {@link POutput}).
- *
- * <p>Common PTransforms include root PTransforms like
- * {@link com.google.cloud.dataflow.sdk.io.TextIO.Read},
- * {@link Create}, processing and
- * conversion operations like {@link ParDo},
- * {@link GroupByKey},
- * {@link com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey},
- * {@link Combine}, and {@link Count}, and outputting
- * PTransforms like
- * {@link com.google.cloud.dataflow.sdk.io.TextIO.Write}. Users also
- * define their own application-specific composite PTransforms.
- *
- * <p>Each {@code PTransform<InputT, OutputT>} has a single
- * {@code InputT} type and a single {@code OutputT} type. Many
- * PTransforms conceptually transform one input value to one output
- * value, and in this case {@code InputT} and {@code Output} are
- * typically instances of
- * {@link com.google.cloud.dataflow.sdk.values.PCollection}.
- * A root
- * PTransform conceptually has no input; in this case, conventionally
- * a {@link com.google.cloud.dataflow.sdk.values.PBegin} object
- * produced by calling {@link Pipeline#begin} is used as the input.
- * An outputting PTransform conceptually has no output; in this case,
- * conventionally {@link com.google.cloud.dataflow.sdk.values.PDone}
- * is used as its output type. Some PTransforms conceptually have
- * multiple inputs and/or outputs; in these cases special "bundling"
- * classes like
- * {@link com.google.cloud.dataflow.sdk.values.PCollectionList},
- * {@link com.google.cloud.dataflow.sdk.values.PCollectionTuple}
- * are used
- * to combine multiple values into a single bundle for passing into or
- * returning from the PTransform.
- *
- * <p>A {@code PTransform<InputT, OutputT>} is invoked by calling
- * {@code apply()} on its {@code InputT}, returning its {@code OutputT}.
- * Calls can be chained to concisely create linear pipeline segments.
- * For example:
- *
- * <pre> {@code
- * PCollection<T1> pc1 = ...;
- * PCollection<T2> pc2 =
- * pc1.apply(ParDo.of(new MyDoFn<T1,KV<K,V>>()))
- * .apply(GroupByKey.<K, V>create())
- * .apply(Combine.perKey(new MyKeyedCombineFn<K,V>()))
- * .apply(ParDo.of(new MyDoFn2<KV<K,V>,T2>()));
- * } </pre>
- *
- * <p>PTransform operations have unique names, which are used by the
- * system when explaining what's going on during optimization and
- * execution. Each PTransform gets a system-provided default name,
- * but it's a good practice to specify an explicit name, where
- * possible, using the {@code named()} method offered by some
- * PTransforms such as {@link ParDo}. For example:
- *
- * <pre> {@code
- * ...
- * .apply(ParDo.named("Step1").of(new MyDoFn3()))
- * ...
- * } </pre>
- *
- * <p>Each PCollection output produced by a PTransform,
- * either directly or within a "bundling" class, automatically gets
- * its own name derived from the name of its producing PTransform.
- *
- * <p>Each PCollection output produced by a PTransform
- * also records a {@link com.google.cloud.dataflow.sdk.coders.Coder}
- * that specifies how the elements of that PCollection
- * are to be encoded as a byte string, if necessary. The
- * PTransform may provide a default Coder for any of its outputs, for
- * instance by deriving it from the PTransform input's Coder. If the
- * PTransform does not specify the Coder for an output PCollection,
- * the system will attempt to infer a Coder for it, based on
- * what's known at run-time about the Java type of the output's
- * elements. The enclosing {@link Pipeline}'s
- * {@link com.google.cloud.dataflow.sdk.coders.CoderRegistry}
- * (accessible via {@link Pipeline#getCoderRegistry}) defines the
- * mapping from Java types to the default Coder to use, for a standard
- * set of Java types; users can extend this mapping for additional
- * types, via
- * {@link com.google.cloud.dataflow.sdk.coders.CoderRegistry#registerCoder}.
- * If this inference process fails, either because the Java type was
- * not known at run-time (e.g., due to Java's "erasure" of generic
- * types) or there was no default Coder registered, then the Coder
- * should be specified manually by calling
- * {@link com.google.cloud.dataflow.sdk.values.TypedPValue#setCoder}
- * on the output PCollection. The Coder of every output
- * PCollection must be determined one way or another
- * before that output is used as an input to another PTransform, or
- * before the enclosing Pipeline is run.
- *
- * <p>A small number of PTransforms are implemented natively by the
- * Google Cloud Dataflow SDK; such PTransforms simply return an
- * output value as their apply implementation.
- * The majority of PTransforms are
- * implemented as composites of other PTransforms. Such a PTransform
- * subclass typically just implements {@link #apply}, computing its
- * Output value from its {@code InputT} value. User programs are encouraged to
- * use this mechanism to modularize their own code. Such composite
- * abstractions get their own name, and navigating through the
- * composition hierarchy of PTransforms is supported by the monitoring
- * interface. Examples of composite PTransforms can be found in this
- * directory and in examples. From the caller's point of view, there
- * is no distinction between a PTransform implemented natively and one
- * implemented in terms of other PTransforms; both kinds of PTransform
- * are invoked in the same way, using {@code apply()}.
- *
- * <h3>Note on Serialization</h3>
- *
- * <p>{@code PTransform} doesn't actually support serialization, despite
- * implementing {@code Serializable}.
- *
- * <p>{@code PTransform} is marked {@code Serializable} solely
- * because it is common for an anonymous {@code DoFn},
- * instance to be created within an
- * {@code apply()} method of a composite {@code PTransform}.
- *
- * <p>Each of those {@code *Fn}s is {@code Serializable}, but
- * unfortunately its instance state will contain a reference to the
- * enclosing {@code PTransform} instance, and so attempt to serialize
- * the {@code PTransform} instance, even though the {@code *Fn}
- * instance never references anything about the enclosing
- * {@code PTransform}.
- *
- * <p>To allow such anonymous {@code *Fn}s to be written
- * conveniently, {@code PTransform} is marked as {@code Serializable},
- * and includes dummy {@code writeObject()} and {@code readObject()}
- * operations that do not save or restore any state.
- *
- * @see <a href=
- * "https://cloud.google.com/dataflow/java-sdk/applying-transforms"
- * >Applying Transformations</a>
- *
- * @param <InputT> the type of the input to this PTransform
- * @param <OutputT> the type of the output of this PTransform
- */
-public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
- implements Serializable /* See the note above */, HasDisplayData {
- /**
- * Applies this {@code PTransform} on the given {@code InputT}, and returns its
- * {@code Output}.
- *
- * <p>Composite transforms, which are defined in terms of other transforms,
- * should return the output of one of the composed transforms. Non-composite
- * transforms, which do not apply any transforms internally, should return
- * a new unbound output and register evaluators (via backend-specific
- * registration methods).
- *
- * <p>The default implementation throws an exception. A derived class must
- * either implement apply, or else each runner must supply a custom
- * implementation via
- * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner#apply}.
- */
- public OutputT apply(InputT input) {
- throw new IllegalArgumentException(
- "Runner " + input.getPipeline().getRunner()
- + " has not registered an implementation for the required primitive operation "
- + this);
- }
-
- /**
- * Called before invoking apply (which may be intercepted by the runner) to
- * verify this transform is fully specified and applicable to the specified
- * input.
- *
- * <p>By default, does nothing.
- */
- public void validate(InputT input) { }
-
- /**
- * Returns the transform name.
- *
- * <p>This name is provided by the transform creator and is not required to be unique.
- */
- public String getName() {
- return name != null ? name : getKindString();
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- // See the note about about PTransform's fake Serializability, to
- // understand why all of its instance state is transient.
-
- /**
- * The base name of this {@code PTransform}, e.g., from
- * {@link ParDo#named(String)}, or from defaults, or {@code null} if not
- * yet assigned.
- */
- protected final transient String name;
-
- protected PTransform() {
- this.name = null;
- }
-
- protected PTransform(String name) {
- this.name = name;
- }
-
- @Override
- public String toString() {
- if (name == null) {
- return getKindString();
- } else {
- return getName() + " [" + getKindString() + "]";
- }
- }
-
- /**
- * Returns the name to use by default for this {@code PTransform}
- * (not including the names of any enclosing {@code PTransform}s).
- *
- * <p>By default, returns the base name of this {@code PTransform}'s class.
- *
- * <p>The caller is responsible for ensuring that names of applied
- * {@code PTransform}s are unique, e.g., by adding a uniquifying
- * suffix when needed.
- */
- protected String getKindString() {
- if (getClass().isAnonymousClass()) {
- return "AnonymousTransform";
- } else {
- return StringUtils.approximatePTransformName(getClass());
- }
- }
-
- private void writeObject(ObjectOutputStream oos) {
- // We don't really want to be serializing this object, but we
- // often have serializable anonymous DoFns nested within a
- // PTransform.
- }
-
- private void readObject(ObjectInputStream oos) {
- // We don't really want to be serializing this object, but we
- // often have serializable anonymous DoFns nested within a
- // PTransform.
- }
-
- /**
- * Returns the default {@code Coder} to use for the output of this
- * single-output {@code PTransform}.
- *
- * <p>By default, always throws
- *
- * @throws CannotProvideCoderException if no coder can be inferred
- */
- protected Coder<?> getDefaultOutputCoder() throws CannotProvideCoderException {
- throw new CannotProvideCoderException(
- "PTransform.getDefaultOutputCoder called.");
- }
-
- /**
- * Returns the default {@code Coder} to use for the output of this
- * single-output {@code PTransform} when applied to the given input.
- *
- * @throws CannotProvideCoderException if none can be inferred.
- *
- * <p>By default, always throws.
- */
- protected Coder<?> getDefaultOutputCoder(@SuppressWarnings("unused") InputT input)
- throws CannotProvideCoderException {
- return getDefaultOutputCoder();
- }
-
- /**
- * Returns the default {@code Coder} to use for the given output of
- * this single-output {@code PTransform} when applied to the given input.
- *
- * @throws CannotProvideCoderException if none can be inferred.
- *
- * <p>By default, always throws.
- */
- public <T> Coder<T> getDefaultOutputCoder(
- InputT input, @SuppressWarnings("unused") TypedPValue<T> output)
- throws CannotProvideCoderException {
- @SuppressWarnings("unchecked")
- Coder<T> defaultOutputCoder = (Coder<T>) getDefaultOutputCoder(input);
- return defaultOutputCoder;
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>By default, does not register any display data. Implementors may override this method
- * to provide their own display metadata.
- */
- @Override
- public void populateDisplayData(Builder builder) {
- }
-}