You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/15 19:48:06 UTC
[09/23] incubator-beam git commit: [BEAM-11] Spark runner directory
structure and pom setup.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
deleted file mode 100644
index 58b1924..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
+++ /dev/null
@@ -1,800 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import com.google.api.client.util.Maps;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.io.AvroIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaRDDLike;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Tuple2;
-
-import static com.cloudera.dataflow.spark.ShardNameBuilder.getOutputDirectory;
-import static com.cloudera.dataflow.spark.ShardNameBuilder.getOutputFilePrefix;
-import static com.cloudera.dataflow.spark.ShardNameBuilder.getOutputFileTemplate;
-import static com.cloudera.dataflow.spark.ShardNameBuilder.replaceShardCount;
-
-import com.cloudera.dataflow.hadoop.HadoopIO;
-
-/**
- * Supports translation between a DataFlow transform, and Spark's operations on RDDs.
- */
-public final class TransformTranslator {
-
- private TransformTranslator() {
- }
-
- public static class FieldGetter {
- private final Map<String, Field> fields;
-
- public FieldGetter(Class<?> clazz) {
- this.fields = Maps.newHashMap();
- for (Field f : clazz.getDeclaredFields()) {
- f.setAccessible(true);
- this.fields.put(f.getName(), f);
- }
- }
-
- public <T> T get(String fieldname, Object value) {
- try {
- @SuppressWarnings("unchecked")
- T fieldValue = (T) fields.get(fieldname).get(value);
- return fieldValue;
- } catch (IllegalAccessException e) {
- throw new IllegalStateException(e);
- }
- }
- }
-
- private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
- return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
- @SuppressWarnings("unchecked")
- @Override
- public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
- PCollectionList<T> pcs = context.getInput(transform);
- JavaRDD<WindowedValue<T>>[] rdds = new JavaRDD[pcs.size()];
- for (int i = 0; i < rdds.length; i++) {
- rdds[i] = (JavaRDD<WindowedValue<T>>) context.getRDD(pcs.get(i));
- }
- JavaRDD<WindowedValue<T>> rdd = context.getSparkContext().union(rdds);
- context.setOutputRDD(transform, rdd);
- }
- };
- }
-
- private static <K, V> TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>> gbk() {
- return new TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>>() {
- @Override
- public void evaluate(GroupByKey.GroupByKeyOnly<K, V> transform, EvaluationContext context) {
- @SuppressWarnings("unchecked")
- JavaRDDLike<WindowedValue<KV<K, V>>, ?> inRDD =
- (JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context.getInputRDD(transform);
- @SuppressWarnings("unchecked")
- KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
- Coder<K> keyCoder = coder.getKeyCoder();
- Coder<V> valueCoder = coder.getValueCoder();
-
- // Use coders to convert objects in the PCollection to byte arrays, so they
- // can be transferred over the network for the shuffle.
- JavaRDDLike<WindowedValue<KV<K, Iterable<V>>>, ?> outRDD = fromPair(
- toPair(inRDD.map(WindowingHelpers.<KV<K, V>>unwindowFunction()))
- .mapToPair(CoderHelpers.toByteFunction(keyCoder, valueCoder))
- .groupByKey()
- .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, valueCoder)))
- // empty windows are OK here, see GroupByKey#evaluateHelper in the SDK
- .map(WindowingHelpers.<KV<K, Iterable<V>>>windowFunction());
- context.setOutputRDD(transform, outRDD);
- }
- };
- }
-
- private static final FieldGetter GROUPED_FG = new FieldGetter(Combine.GroupedValues.class);
-
- private static <K, VI, VO> TransformEvaluator<Combine.GroupedValues<K, VI, VO>> grouped() {
- return new TransformEvaluator<Combine.GroupedValues<K, VI, VO>>() {
- @Override
- public void evaluate(Combine.GroupedValues<K, VI, VO> transform, EvaluationContext context) {
- Combine.KeyedCombineFn<K, VI, ?, VO> keyed = GROUPED_FG.get("fn", transform);
- @SuppressWarnings("unchecked")
- JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?> inRDD =
- (JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?>) context.getInputRDD(transform);
- context.setOutputRDD(transform,
- inRDD.map(new KVFunction<>(keyed)));
- }
- };
- }
-
- private static final FieldGetter COMBINE_GLOBALLY_FG = new FieldGetter(Combine.Globally.class);
-
- private static <I, A, O> TransformEvaluator<Combine.Globally<I, O>> combineGlobally() {
- return new TransformEvaluator<Combine.Globally<I, O>>() {
-
- @Override
- public void evaluate(Combine.Globally<I, O> transform, EvaluationContext context) {
- final Combine.CombineFn<I, A, O> globally = COMBINE_GLOBALLY_FG.get("fn", transform);
-
- @SuppressWarnings("unchecked")
- JavaRDDLike<WindowedValue<I>, ?> inRdd =
- (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);
-
- final Coder<I> iCoder = context.getInput(transform).getCoder();
- final Coder<A> aCoder;
- try {
- aCoder = globally.getAccumulatorCoder(
- context.getPipeline().getCoderRegistry(), iCoder);
- } catch (CannotProvideCoderException e) {
- throw new IllegalStateException("Could not determine coder for accumulator", e);
- }
-
- // Use coders to convert objects in the PCollection to byte arrays, so they
- // can be transferred over the network for the shuffle.
- JavaRDD<byte[]> inRddBytes = inRdd
- .map(WindowingHelpers.<I>unwindowFunction())
- .map(CoderHelpers.toByteFunction(iCoder));
-
- /*A*/ byte[] acc = inRddBytes.aggregate(
- CoderHelpers.toByteArray(globally.createAccumulator(), aCoder),
- new Function2</*A*/ byte[], /*I*/ byte[], /*A*/ byte[]>() {
- @Override
- public /*A*/ byte[] call(/*A*/ byte[] ab, /*I*/ byte[] ib) throws Exception {
- A a = CoderHelpers.fromByteArray(ab, aCoder);
- I i = CoderHelpers.fromByteArray(ib, iCoder);
- return CoderHelpers.toByteArray(globally.addInput(a, i), aCoder);
- }
- },
- new Function2</*A*/ byte[], /*A*/ byte[], /*A*/ byte[]>() {
- @Override
- public /*A*/ byte[] call(/*A*/ byte[] a1b, /*A*/ byte[] a2b) throws Exception {
- A a1 = CoderHelpers.fromByteArray(a1b, aCoder);
- A a2 = CoderHelpers.fromByteArray(a2b, aCoder);
- // don't use Guava's ImmutableList.of as values may be null
- List<A> accumulators = Collections.unmodifiableList(Arrays.asList(a1, a2));
- A merged = globally.mergeAccumulators(accumulators);
- return CoderHelpers.toByteArray(merged, aCoder);
- }
- }
- );
- O output = globally.extractOutput(CoderHelpers.fromByteArray(acc, aCoder));
-
- Coder<O> coder = context.getOutput(transform).getCoder();
- JavaRDD<byte[]> outRdd = context.getSparkContext().parallelize(
- // don't use Guava's ImmutableList.of as output may be null
- CoderHelpers.toByteArrays(Collections.singleton(output), coder));
- context.setOutputRDD(transform, outRdd.map(CoderHelpers.fromByteFunction(coder))
- .map(WindowingHelpers.<O>windowFunction()));
- }
- };
- }
-
- private static final FieldGetter COMBINE_PERKEY_FG = new FieldGetter(Combine.PerKey.class);
-
- private static <K, VI, VA, VO> TransformEvaluator<Combine.PerKey<K, VI, VO>> combinePerKey() {
- return new TransformEvaluator<Combine.PerKey<K, VI, VO>>() {
- @Override
- public void evaluate(Combine.PerKey<K, VI, VO> transform, EvaluationContext context) {
- final Combine.KeyedCombineFn<K, VI, VA, VO> keyed =
- COMBINE_PERKEY_FG.get("fn", transform);
- @SuppressWarnings("unchecked")
- JavaRDDLike<WindowedValue<KV<K, VI>>, ?> inRdd =
- (JavaRDDLike<WindowedValue<KV<K, VI>>, ?>) context.getInputRDD(transform);
-
- @SuppressWarnings("unchecked")
- KvCoder<K, VI> inputCoder = (KvCoder<K, VI>) context.getInput(transform).getCoder();
- Coder<K> keyCoder = inputCoder.getKeyCoder();
- Coder<VI> viCoder = inputCoder.getValueCoder();
- Coder<VA> vaCoder;
- try {
- vaCoder = keyed.getAccumulatorCoder(
- context.getPipeline().getCoderRegistry(), keyCoder, viCoder);
- } catch (CannotProvideCoderException e) {
- throw new IllegalStateException("Could not determine coder for accumulator", e);
- }
- Coder<KV<K, VI>> kviCoder = KvCoder.of(keyCoder, viCoder);
- Coder<KV<K, VA>> kvaCoder = KvCoder.of(keyCoder, vaCoder);
-
- // We need to duplicate K as both the key of the JavaPairRDD as well as inside the value,
- // since the functions passed to combineByKey don't receive the associated key of each
- // value, and we need to map back into methods in Combine.KeyedCombineFn, which each
- // require the key in addition to the VI's and VA's being merged/accumulated. Once Spark
- // provides a way to include keys in the arguments of combine/merge functions, we won't
- // need to duplicate the keys anymore.
-
- // Key has to bw windowed in order to group by window as well
- JavaPairRDD<WindowedValue<K>, WindowedValue<KV<K, VI>>> inRddDuplicatedKeyPair =
- inRdd.mapToPair(
- new PairFunction<WindowedValue<KV<K, VI>>, WindowedValue<K>,
- WindowedValue<KV<K, VI>>>() {
- @Override
- public Tuple2<WindowedValue<K>,
- WindowedValue<KV<K, VI>>> call(WindowedValue<KV<K, VI>> kv) {
- WindowedValue<K> wk = WindowedValue.of(kv.getValue().getKey(),
- kv.getTimestamp(), kv.getWindows(), kv.getPane());
- return new Tuple2<>(wk, kv);
- }
- });
- //-- windowed coders
- final WindowedValue.FullWindowedValueCoder<K> wkCoder =
- WindowedValue.FullWindowedValueCoder.of(keyCoder,
- context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
- final WindowedValue.FullWindowedValueCoder<KV<K, VI>> wkviCoder =
- WindowedValue.FullWindowedValueCoder.of(kviCoder,
- context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
- final WindowedValue.FullWindowedValueCoder<KV<K, VA>> wkvaCoder =
- WindowedValue.FullWindowedValueCoder.of(kvaCoder,
- context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
-
- // Use coders to convert objects in the PCollection to byte arrays, so they
- // can be transferred over the network for the shuffle.
- JavaPairRDD<ByteArray, byte[]> inRddDuplicatedKeyPairBytes = inRddDuplicatedKeyPair
- .mapToPair(CoderHelpers.toByteFunction(wkCoder, wkviCoder));
-
- // The output of combineByKey will be "VA" (accumulator) types rather than "VO" (final
- // output types) since Combine.CombineFn only provides ways to merge VAs, and no way
- // to merge VOs.
- JavaPairRDD</*K*/ ByteArray, /*KV<K, VA>*/ byte[]> accumulatedBytes =
- inRddDuplicatedKeyPairBytes.combineByKey(
- new Function</*KV<K, VI>*/ byte[], /*KV<K, VA>*/ byte[]>() {
- @Override
- public /*KV<K, VA>*/ byte[] call(/*KV<K, VI>*/ byte[] input) {
- WindowedValue<KV<K, VI>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder);
- VA va = keyed.createAccumulator(wkvi.getValue().getKey());
- va = keyed.addInput(wkvi.getValue().getKey(), va, wkvi.getValue().getValue());
- WindowedValue<KV<K, VA>> wkva =
- WindowedValue.of(KV.of(wkvi.getValue().getKey(), va), wkvi.getTimestamp(),
- wkvi.getWindows(), wkvi.getPane());
- return CoderHelpers.toByteArray(wkva, wkvaCoder);
- }
- },
- new Function2</*KV<K, VA>*/ byte[], /*KV<K, VI>*/ byte[], /*KV<K, VA>*/ byte[]>() {
- @Override
- public /*KV<K, VA>*/ byte[] call(/*KV<K, VA>*/ byte[] acc,
- /*KV<K, VI>*/ byte[] input) {
- WindowedValue<KV<K, VA>> wkva = CoderHelpers.fromByteArray(acc, wkvaCoder);
- WindowedValue<KV<K, VI>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder);
- VA va = keyed.addInput(wkva.getValue().getKey(), wkva.getValue().getValue(),
- wkvi.getValue().getValue());
- wkva = WindowedValue.of(KV.of(wkva.getValue().getKey(), va), wkva.getTimestamp(),
- wkva.getWindows(), wkva.getPane());
- return CoderHelpers.toByteArray(wkva, wkvaCoder);
- }
- },
- new Function2</*KV<K, VA>*/ byte[], /*KV<K, VA>*/ byte[], /*KV<K, VA>*/ byte[]>() {
- @Override
- public /*KV<K, VA>*/ byte[] call(/*KV<K, VA>*/ byte[] acc1,
- /*KV<K, VA>*/ byte[] acc2) {
- WindowedValue<KV<K, VA>> wkva1 = CoderHelpers.fromByteArray(acc1, wkvaCoder);
- WindowedValue<KV<K, VA>> wkva2 = CoderHelpers.fromByteArray(acc2, wkvaCoder);
- VA va = keyed.mergeAccumulators(wkva1.getValue().getKey(),
- // don't use Guava's ImmutableList.of as values may be null
- Collections.unmodifiableList(Arrays.asList(wkva1.getValue().getValue(),
- wkva2.getValue().getValue())));
- WindowedValue<KV<K, VA>> wkva = WindowedValue.of(KV.of(wkva1.getValue().getKey(),
- va), wkva1.getTimestamp(), wkva1.getWindows(), wkva1.getPane());
- return CoderHelpers.toByteArray(wkva, wkvaCoder);
- }
- });
-
- JavaPairRDD<WindowedValue<K>, WindowedValue<VO>> extracted = accumulatedBytes
- .mapToPair(CoderHelpers.fromByteFunction(wkCoder, wkvaCoder))
- .mapValues(
- new Function<WindowedValue<KV<K, VA>>, WindowedValue<VO>>() {
- @Override
- public WindowedValue<VO> call(WindowedValue<KV<K, VA>> acc) {
- return WindowedValue.of(keyed.extractOutput(acc.getValue().getKey(),
- acc.getValue().getValue()), acc.getTimestamp(),
- acc.getWindows(), acc.getPane());
- }
- });
-
- context.setOutputRDD(transform,
- fromPair(extracted)
- .map(new Function<KV<WindowedValue<K>, WindowedValue<VO>>, WindowedValue<KV<K, VO>>>() {
- @Override
- public WindowedValue<KV<K, VO>> call(KV<WindowedValue<K>, WindowedValue<VO>> kwvo)
- throws Exception {
- WindowedValue<VO> wvo = kwvo.getValue();
- KV<K, VO> kvo = KV.of(kwvo.getKey().getValue(), wvo.getValue());
- return WindowedValue.of(kvo, wvo.getTimestamp(), wvo.getWindows(), wvo.getPane());
- }
- }));
- }
- };
- }
-
- private static final class KVFunction<K, VI, VO>
- implements Function<WindowedValue<KV<K, Iterable<VI>>>, WindowedValue<KV<K, VO>>> {
- private final Combine.KeyedCombineFn<K, VI, ?, VO> keyed;
-
- KVFunction(Combine.KeyedCombineFn<K, VI, ?, VO> keyed) {
- this.keyed = keyed;
- }
-
- @Override
- public WindowedValue<KV<K, VO>> call(WindowedValue<KV<K, Iterable<VI>>> windowedKv)
- throws Exception {
- KV<K, Iterable<VI>> kv = windowedKv.getValue();
- return WindowedValue.of(KV.of(kv.getKey(), keyed.apply(kv.getKey(), kv.getValue())),
- windowedKv.getTimestamp(), windowedKv.getWindows(), windowedKv.getPane());
- }
- }
-
- private static <K, V> JavaPairRDD<K, V> toPair(JavaRDDLike<KV<K, V>, ?> rdd) {
- return rdd.mapToPair(new PairFunction<KV<K, V>, K, V>() {
- @Override
- public Tuple2<K, V> call(KV<K, V> kv) {
- return new Tuple2<>(kv.getKey(), kv.getValue());
- }
- });
- }
-
- private static <K, V> JavaRDDLike<KV<K, V>, ?> fromPair(JavaPairRDD<K, V> rdd) {
- return rdd.map(new Function<Tuple2<K, V>, KV<K, V>>() {
- @Override
- public KV<K, V> call(Tuple2<K, V> t2) {
- return KV.of(t2._1(), t2._2());
- }
- });
- }
-
- private static <I, O> TransformEvaluator<ParDo.Bound<I, O>> parDo() {
- return new TransformEvaluator<ParDo.Bound<I, O>>() {
- @Override
- public void evaluate(ParDo.Bound<I, O> transform, EvaluationContext context) {
- DoFnFunction<I, O> dofn =
- new DoFnFunction<>(transform.getFn(),
- context.getRuntimeContext(),
- getSideInputs(transform.getSideInputs(), context));
- @SuppressWarnings("unchecked")
- JavaRDDLike<WindowedValue<I>, ?> inRDD =
- (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);
- context.setOutputRDD(transform, inRDD.mapPartitions(dofn));
- }
- };
- }
-
- private static final FieldGetter MULTIDO_FG = new FieldGetter(ParDo.BoundMulti.class);
-
- private static <I, O> TransformEvaluator<ParDo.BoundMulti<I, O>> multiDo() {
- return new TransformEvaluator<ParDo.BoundMulti<I, O>>() {
- @Override
- public void evaluate(ParDo.BoundMulti<I, O> transform, EvaluationContext context) {
- TupleTag<O> mainOutputTag = MULTIDO_FG.get("mainOutputTag", transform);
- MultiDoFnFunction<I, O> multifn = new MultiDoFnFunction<>(
- transform.getFn(),
- context.getRuntimeContext(),
- mainOutputTag,
- getSideInputs(transform.getSideInputs(), context));
-
- @SuppressWarnings("unchecked")
- JavaRDDLike<WindowedValue<I>, ?> inRDD =
- (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);
- JavaPairRDD<TupleTag<?>, WindowedValue<?>> all = inRDD
- .mapPartitionsToPair(multifn)
- .cache();
-
- PCollectionTuple pct = context.getOutput(transform);
- for (Map.Entry<TupleTag<?>, PCollection<?>> e : pct.getAll().entrySet()) {
- @SuppressWarnings("unchecked")
- JavaPairRDD<TupleTag<?>, WindowedValue<?>> filtered =
- all.filter(new TupleTagFilter(e.getKey()));
- @SuppressWarnings("unchecked")
- // Object is the best we can do since different outputs can have different tags
- JavaRDD<WindowedValue<Object>> values =
- (JavaRDD<WindowedValue<Object>>) (JavaRDD<?>) filtered.values();
- context.setRDD(e.getValue(), values);
- }
- }
- };
- }
-
-
- private static <T> TransformEvaluator<TextIO.Read.Bound<T>> readText() {
- return new TransformEvaluator<TextIO.Read.Bound<T>>() {
- @Override
- public void evaluate(TextIO.Read.Bound<T> transform, EvaluationContext context) {
- String pattern = transform.getFilepattern();
- JavaRDD<WindowedValue<String>> rdd = context.getSparkContext().textFile(pattern)
- .map(WindowingHelpers.<String>windowFunction());
- context.setOutputRDD(transform, rdd);
- }
- };
- }
-
- private static <T> TransformEvaluator<TextIO.Write.Bound<T>> writeText() {
- return new TransformEvaluator<TextIO.Write.Bound<T>>() {
- @Override
- public void evaluate(TextIO.Write.Bound<T> transform, EvaluationContext context) {
- @SuppressWarnings("unchecked")
- JavaPairRDD<T, Void> last =
- ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform))
- .map(WindowingHelpers.<T>unwindowFunction())
- .mapToPair(new PairFunction<T, T,
- Void>() {
- @Override
- public Tuple2<T, Void> call(T t) throws Exception {
- return new Tuple2<>(t, null);
- }
- });
- ShardTemplateInformation shardTemplateInfo =
- new ShardTemplateInformation(transform.getNumShards(),
- transform.getShardTemplate(), transform.getFilenamePrefix(),
- transform.getFilenameSuffix());
- writeHadoopFile(last, new Configuration(), shardTemplateInfo, Text.class,
- NullWritable.class, TemplatedTextOutputFormat.class);
- }
- };
- }
-
- private static <T> TransformEvaluator<AvroIO.Read.Bound<T>> readAvro() {
- return new TransformEvaluator<AvroIO.Read.Bound<T>>() {
- @Override
- public void evaluate(AvroIO.Read.Bound<T> transform, EvaluationContext context) {
- String pattern = transform.getFilepattern();
- JavaSparkContext jsc = context.getSparkContext();
- @SuppressWarnings("unchecked")
- JavaRDD<AvroKey<T>> avroFile = (JavaRDD<AvroKey<T>>) (JavaRDD<?>)
- jsc.newAPIHadoopFile(pattern,
- AvroKeyInputFormat.class,
- AvroKey.class, NullWritable.class,
- new Configuration()).keys();
- JavaRDD<WindowedValue<T>> rdd = avroFile.map(
- new Function<AvroKey<T>, T>() {
- @Override
- public T call(AvroKey<T> key) {
- return key.datum();
- }
- }).map(WindowingHelpers.<T>windowFunction());
- context.setOutputRDD(transform, rdd);
- }
- };
- }
-
- private static <T> TransformEvaluator<AvroIO.Write.Bound<T>> writeAvro() {
- return new TransformEvaluator<AvroIO.Write.Bound<T>>() {
- @Override
- public void evaluate(AvroIO.Write.Bound<T> transform, EvaluationContext context) {
- Job job;
- try {
- job = Job.getInstance();
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- AvroJob.setOutputKeySchema(job, transform.getSchema());
- @SuppressWarnings("unchecked")
- JavaPairRDD<AvroKey<T>, NullWritable> last =
- ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform))
- .map(WindowingHelpers.<T>unwindowFunction())
- .mapToPair(new PairFunction<T, AvroKey<T>, NullWritable>() {
- @Override
- public Tuple2<AvroKey<T>, NullWritable> call(T t) throws Exception {
- return new Tuple2<>(new AvroKey<>(t), NullWritable.get());
- }
- });
- ShardTemplateInformation shardTemplateInfo =
- new ShardTemplateInformation(transform.getNumShards(),
- transform.getShardTemplate(), transform.getFilenamePrefix(),
- transform.getFilenameSuffix());
- writeHadoopFile(last, job.getConfiguration(), shardTemplateInfo,
- AvroKey.class, NullWritable.class, TemplatedAvroKeyOutputFormat.class);
- }
- };
- }
-
- private static <K, V> TransformEvaluator<HadoopIO.Read.Bound<K, V>> readHadoop() {
- return new TransformEvaluator<HadoopIO.Read.Bound<K, V>>() {
- @Override
- public void evaluate(HadoopIO.Read.Bound<K, V> transform, EvaluationContext context) {
- String pattern = transform.getFilepattern();
- JavaSparkContext jsc = context.getSparkContext();
- @SuppressWarnings ("unchecked")
- JavaPairRDD<K, V> file = jsc.newAPIHadoopFile(pattern,
- transform.getFormatClass(),
- transform.getKeyClass(), transform.getValueClass(),
- new Configuration());
- JavaRDD<WindowedValue<KV<K, V>>> rdd =
- file.map(new Function<Tuple2<K, V>, KV<K, V>>() {
- @Override
- public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
- return KV.of(t2._1(), t2._2());
- }
- }).map(WindowingHelpers.<KV<K, V>>windowFunction());
- context.setOutputRDD(transform, rdd);
- }
- };
- }
-
- private static <K, V> TransformEvaluator<HadoopIO.Write.Bound<K, V>> writeHadoop() {
- return new TransformEvaluator<HadoopIO.Write.Bound<K, V>>() {
- @Override
- public void evaluate(HadoopIO.Write.Bound<K, V> transform, EvaluationContext context) {
- @SuppressWarnings("unchecked")
- JavaPairRDD<K, V> last = ((JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context
- .getInputRDD(transform))
- .map(WindowingHelpers.<KV<K, V>>unwindowFunction())
- .mapToPair(new PairFunction<KV<K, V>, K, V>() {
- @Override
- public Tuple2<K, V> call(KV<K, V> t) throws Exception {
- return new Tuple2<>(t.getKey(), t.getValue());
- }
- });
- ShardTemplateInformation shardTemplateInfo =
- new ShardTemplateInformation(transform.getNumShards(),
- transform.getShardTemplate(), transform.getFilenamePrefix(),
- transform.getFilenameSuffix());
- Configuration conf = new Configuration();
- for (Map.Entry<String, String> e : transform.getConfigurationProperties().entrySet()) {
- conf.set(e.getKey(), e.getValue());
- }
- writeHadoopFile(last, conf, shardTemplateInfo,
- transform.getKeyClass(), transform.getValueClass(), transform.getFormatClass());
- }
- };
- }
-
- private static final class ShardTemplateInformation {
- private final int numShards;
- private final String shardTemplate;
- private final String filenamePrefix;
- private final String filenameSuffix;
-
- private ShardTemplateInformation(int numShards, String shardTemplate, String
- filenamePrefix, String filenameSuffix) {
- this.numShards = numShards;
- this.shardTemplate = shardTemplate;
- this.filenamePrefix = filenamePrefix;
- this.filenameSuffix = filenameSuffix;
- }
-
- int getNumShards() {
- return numShards;
- }
-
- String getShardTemplate() {
- return shardTemplate;
- }
-
- String getFilenamePrefix() {
- return filenamePrefix;
- }
-
- String getFilenameSuffix() {
- return filenameSuffix;
- }
- }
-
- private static <K, V> void writeHadoopFile(JavaPairRDD<K, V> rdd, Configuration conf,
- ShardTemplateInformation shardTemplateInfo, Class<?> keyClass, Class<?> valueClass,
- Class<? extends FileOutputFormat> formatClass) {
- int numShards = shardTemplateInfo.getNumShards();
- String shardTemplate = shardTemplateInfo.getShardTemplate();
- String filenamePrefix = shardTemplateInfo.getFilenamePrefix();
- String filenameSuffix = shardTemplateInfo.getFilenameSuffix();
- if (numShards != 0) {
- // number of shards was set explicitly, so repartition
- rdd = rdd.repartition(numShards);
- }
- int actualNumShards = rdd.partitions().size();
- String template = replaceShardCount(shardTemplate, actualNumShards);
- String outputDir = getOutputDirectory(filenamePrefix, template);
- String filePrefix = getOutputFilePrefix(filenamePrefix, template);
- String fileTemplate = getOutputFileTemplate(filenamePrefix, template);
-
- conf.set(ShardNameTemplateHelper.OUTPUT_FILE_PREFIX, filePrefix);
- conf.set(ShardNameTemplateHelper.OUTPUT_FILE_TEMPLATE, fileTemplate);
- conf.set(ShardNameTemplateHelper.OUTPUT_FILE_SUFFIX, filenameSuffix);
- rdd.saveAsNewAPIHadoopFile(outputDir, keyClass, valueClass, formatClass, conf);
- }
-
- private static final FieldGetter WINDOW_FG = new FieldGetter(Window.Bound.class);
-
- private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
- return new TransformEvaluator<Window.Bound<T>>() {
- @Override
- public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
- @SuppressWarnings("unchecked")
- JavaRDDLike<WindowedValue<T>, ?> inRDD =
- (JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform);
- WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
- if (windowFn instanceof GlobalWindows) {
- context.setOutputRDD(transform, inRDD);
- } else {
- @SuppressWarnings("unchecked")
- DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
- DoFnFunction<T, T> dofn =
- new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null);
- context.setOutputRDD(transform, inRDD.mapPartitions(dofn));
- }
- }
- };
- }
-
- private static <T> TransformEvaluator<Create.Values<T>> create() {
- return new TransformEvaluator<Create.Values<T>>() {
- @Override
- public void evaluate(Create.Values<T> transform, EvaluationContext context) {
- Iterable<T> elems = transform.getElements();
- // Use a coder to convert the objects in the PCollection to byte arrays, so they
- // can be transferred over the network.
- Coder<T> coder = context.getOutput(transform).getCoder();
- context.setOutputRDDFromValues(transform, elems, coder);
- }
- };
- }
-
- private static <T> TransformEvaluator<View.AsSingleton<T>> viewAsSingleton() {
- return new TransformEvaluator<View.AsSingleton<T>>() {
- @Override
- public void evaluate(View.AsSingleton<T> transform, EvaluationContext context) {
- Iterable<? extends WindowedValue<?>> iter =
- context.getWindowedValues(context.getInput(transform));
- context.setPView(context.getOutput(transform), iter);
- }
- };
- }
-
- private static <T> TransformEvaluator<View.AsIterable<T>> viewAsIter() {
- return new TransformEvaluator<View.AsIterable<T>>() {
- @Override
- public void evaluate(View.AsIterable<T> transform, EvaluationContext context) {
- Iterable<? extends WindowedValue<?>> iter =
- context.getWindowedValues(context.getInput(transform));
- context.setPView(context.getOutput(transform), iter);
- }
- };
- }
-
- private static <R, W> TransformEvaluator<View.CreatePCollectionView<R, W>> createPCollView() {
- return new TransformEvaluator<View.CreatePCollectionView<R, W>>() {
- @Override
- public void evaluate(View.CreatePCollectionView<R, W> transform, EvaluationContext context) {
- Iterable<? extends WindowedValue<?>> iter =
- context.getWindowedValues(context.getInput(transform));
- context.setPView(context.getOutput(transform), iter);
- }
- };
- }
-
- private static final class TupleTagFilter<V>
- implements Function<Tuple2<TupleTag<V>, WindowedValue<?>>, Boolean> {
-
- private final TupleTag<V> tag;
-
- private TupleTagFilter(TupleTag<V> tag) {
- this.tag = tag;
- }
-
- @Override
- public Boolean call(Tuple2<TupleTag<V>, WindowedValue<?>> input) {
- return tag.equals(input._1());
- }
- }
-
- private static Map<TupleTag<?>, BroadcastHelper<?>> getSideInputs(
- List<PCollectionView<?>> views,
- EvaluationContext context) {
- if (views == null) {
- return ImmutableMap.of();
- } else {
- Map<TupleTag<?>, BroadcastHelper<?>> sideInputs = Maps.newHashMap();
- for (PCollectionView<?> view : views) {
- Iterable<? extends WindowedValue<?>> collectionView = context.getPCollectionView(view);
- Coder<Iterable<WindowedValue<?>>> coderInternal = view.getCoderInternal();
- @SuppressWarnings("unchecked")
- BroadcastHelper<?> helper =
- BroadcastHelper.create((Iterable<WindowedValue<?>>) collectionView, coderInternal);
- //broadcast side inputs
- helper.broadcast(context.getSparkContext());
- sideInputs.put(view.getTagInternal(), helper);
- }
- return sideInputs;
- }
- }
-
- private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps
- .newHashMap();
-
- static {
- EVALUATORS.put(TextIO.Read.Bound.class, readText());
- EVALUATORS.put(TextIO.Write.Bound.class, writeText());
- EVALUATORS.put(AvroIO.Read.Bound.class, readAvro());
- EVALUATORS.put(AvroIO.Write.Bound.class, writeAvro());
- EVALUATORS.put(HadoopIO.Read.Bound.class, readHadoop());
- EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop());
- EVALUATORS.put(ParDo.Bound.class, parDo());
- EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
- EVALUATORS.put(GroupByKey.GroupByKeyOnly.class, gbk());
- EVALUATORS.put(Combine.GroupedValues.class, grouped());
- EVALUATORS.put(Combine.Globally.class, combineGlobally());
- EVALUATORS.put(Combine.PerKey.class, combinePerKey());
- EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
- EVALUATORS.put(Create.Values.class, create());
- EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
- EVALUATORS.put(View.AsIterable.class, viewAsIter());
- EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());
- EVALUATORS.put(Window.Bound.class, window());
- }
-
- public static <PT extends PTransform<?, ?>> TransformEvaluator<PT>
- getTransformEvaluator(Class<PT> clazz) {
- @SuppressWarnings("unchecked")
- TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz);
- if (transform == null) {
- throw new IllegalStateException("No TransformEvaluator registered for " + clazz);
- }
- return transform;
- }
-
- /**
- * Translator matches Dataflow transformation with the appropriate evaluator.
- */
- public static class Translator implements SparkPipelineTranslator {
-
- @Override
- public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
- return EVALUATORS.containsKey(clazz);
- }
-
- @Override
- public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) {
- return getTransformEvaluator(clazz);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/WindowingHelpers.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/WindowingHelpers.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/WindowingHelpers.java
deleted file mode 100644
index 90600b2..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/WindowingHelpers.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import org.apache.spark.api.java.function.Function;
-
-/**
- * Helper functions for working with windows.
- */
-public final class WindowingHelpers {
- private WindowingHelpers() {
- }
-
- /**
- * A function for converting a value to a {@link WindowedValue}. The resulting
- * {@link WindowedValue} will be in no windows, and will have the default timestamp
- * and pane.
- *
- * @param <T> The type of the object.
- * @return A function that accepts an object and returns its {@link WindowedValue}.
- */
- public static <T> Function<T, WindowedValue<T>> windowFunction() {
- return new Function<T, WindowedValue<T>>() {
- @Override
- public WindowedValue<T> call(T t) {
- return WindowedValue.valueInEmptyWindows(t);
- }
- };
- }
-
- /**
- * A function for extracting the value from a {@link WindowedValue}.
- *
- * @param <T> The type of the object.
- * @return A function that accepts a {@link WindowedValue} and returns its value.
- */
- public static <T> Function<WindowedValue<T>, T> unwindowFunction() {
- return new Function<WindowedValue<T>, T>() {
- @Override
- public T call(WindowedValue<T> t) {
- return t.getValue();
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/aggregators/AggAccumParam.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/aggregators/AggAccumParam.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/aggregators/AggAccumParam.java
deleted file mode 100644
index a3055a2..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/aggregators/AggAccumParam.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark.aggregators;
-
-import org.apache.spark.AccumulatorParam;
-
-public class AggAccumParam implements AccumulatorParam<NamedAggregators> {
- @Override
- public NamedAggregators addAccumulator(NamedAggregators current, NamedAggregators added) {
- return current.merge(added);
- }
-
- @Override
- public NamedAggregators addInPlace(NamedAggregators current, NamedAggregators added) {
- return addAccumulator(current, added);
- }
-
- @Override
- public NamedAggregators zero(NamedAggregators initialValue) {
- return new NamedAggregators();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/aggregators/NamedAggregators.java
deleted file mode 100644
index d51e404..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/aggregators/NamedAggregators.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark.aggregators;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.Map;
-import java.util.TreeMap;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.common.collect.ImmutableList;
-
-import com.cloudera.dataflow.spark.SparkRuntimeContext;
-
-/**
- * This class wraps a map of named aggregators. Spark expects that all accumulators be declared
- * before a job is launched. Dataflow allows aggregators to be used and incremented on the fly.
- * We create a map of named aggregators and instantiate in the the spark context before the job
- * is launched. We can then add aggregators on the fly in Spark.
- */
-public class NamedAggregators implements Serializable {
- /**
- * Map from aggregator name to current state.
- */
- private final Map<String, State<?, ?, ?>> mNamedAggregators = new TreeMap<>();
-
- /**
- * Constructs a new NamedAggregators instance.
- */
- public NamedAggregators() {
- }
-
- /**
- * Constructs a new named aggregators instance that contains a mapping from the specified
- * `named` to the associated initial state.
- *
- * @param name Name of aggregator.
- * @param state Associated State.
- */
- public NamedAggregators(String name, State<?, ?, ?> state) {
- this.mNamedAggregators.put(name, state);
- }
-
- /**
- * @param name Name of aggregator to retrieve.
- * @param typeClass Type class to cast the value to.
- * @param <T> Type to be returned.
- * @return the value of the aggregator associated with the specified name
- */
- public <T> T getValue(String name, Class<T> typeClass) {
- return typeClass.cast(mNamedAggregators.get(name).render());
- }
-
- /**
- * Merges another NamedAggregators instance with this instance.
- *
- * @param other The other instance of named aggregators ot merge.
- * @return This instance of Named aggregators with associated states updated to reflect the
- * other instance's aggregators.
- */
- public NamedAggregators merge(NamedAggregators other) {
- for (Map.Entry<String, State<?, ?, ?>> e : other.mNamedAggregators.entrySet()) {
- String key = e.getKey();
- State<?, ?, ?> otherValue = e.getValue();
- State<?, ?, ?> value = mNamedAggregators.get(key);
- if (value == null) {
- mNamedAggregators.put(key, otherValue);
- } else {
- mNamedAggregators.put(key, merge(value, otherValue));
- }
- }
- return this;
- }
-
- /**
- * Helper method to merge States whose generic types aren't provably the same,
- * so require some casting.
- */
- @SuppressWarnings("unchecked")
- private static <A, B, C> State<A, B, C> merge(State<?, ?, ?> s1, State<?, ?, ?> s2) {
- return ((State<A, B, C>) s1).merge((State<A, B, C>) s2);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- for (Map.Entry<String, State<?, ?, ?>> e : mNamedAggregators.entrySet()) {
- sb.append(e.getKey()).append(": ").append(e.getValue().render());
- }
- return sb.toString();
- }
-
- /**
- * @param <IN> Input data type
- * @param <INTER> Intermediate data type (useful for averages)
- * @param <OUT> Output data type
- */
- public interface State<IN, INTER, OUT> extends Serializable {
- /**
- * @param element new element to update state
- */
- void update(IN element);
-
- State<IN, INTER, OUT> merge(State<IN, INTER, OUT> other);
-
- INTER current();
-
- OUT render();
-
- Combine.CombineFn<IN, INTER, OUT> getCombineFn();
- }
-
- /**
- * => combineFunction in data flow.
- */
- public static class CombineFunctionState<IN, INTER, OUT> implements State<IN, INTER, OUT> {
-
- private Combine.CombineFn<IN, INTER, OUT> combineFn;
- private Coder<IN> inCoder;
- private SparkRuntimeContext ctxt;
- private transient INTER state;
-
- public CombineFunctionState(
- Combine.CombineFn<IN, INTER, OUT> combineFn,
- Coder<IN> inCoder,
- SparkRuntimeContext ctxt) {
- this.combineFn = combineFn;
- this.inCoder = inCoder;
- this.ctxt = ctxt;
- this.state = combineFn.createAccumulator();
- }
-
- @Override
- public void update(IN element) {
- combineFn.addInput(state, element);
- }
-
- @Override
- public State<IN, INTER, OUT> merge(State<IN, INTER, OUT> other) {
- this.state = combineFn.mergeAccumulators(ImmutableList.of(current(), other.current()));
- return this;
- }
-
- @Override
- public INTER current() {
- return state;
- }
-
- @Override
- public OUT render() {
- return combineFn.extractOutput(state);
- }
-
- @Override
- public Combine.CombineFn<IN, INTER, OUT> getCombineFn() {
- return combineFn;
- }
-
- private void writeObject(ObjectOutputStream oos) throws IOException {
- oos.writeObject(ctxt);
- oos.writeObject(combineFn);
- oos.writeObject(inCoder);
- try {
- combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
- .encode(state, oos, Coder.Context.NESTED);
- } catch (CannotProvideCoderException e) {
- throw new IllegalStateException("Could not determine coder for accumulator", e);
- }
- }
-
- @SuppressWarnings("unchecked")
- private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
- ctxt = (SparkRuntimeContext) ois.readObject();
- combineFn = (Combine.CombineFn<IN, INTER, OUT>) ois.readObject();
- inCoder = (Coder<IN>) ois.readObject();
- try {
- state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
- .decode(ois, Coder.Context.NESTED);
- } catch (CannotProvideCoderException e) {
- throw new IllegalStateException("Could not determine coder for accumulator", e);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptions.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptions.java
deleted file mode 100644
index 57253f0..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptions.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.dataflow.spark.streaming;
-
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-
-import com.cloudera.dataflow.spark.SparkPipelineOptions;
-
-/**
- * Options used to configure Spark streaming.
- */
-public interface SparkStreamingPipelineOptions extends SparkPipelineOptions {
- @Description("Timeout to wait (in msec) for the streaming execution so stop, -1 runs until " +
- "execution is stopped")
- @Default.Long(-1)
- Long getTimeout();
-
- void setTimeout(Long batchInterval);
-
- @Override
- @Default.Boolean(true)
- boolean isStreaming();
-
- @Override
- @Default.String("spark streaming dataflow pipeline job")
- String getAppName();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsFactory.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsFactory.java
deleted file mode 100644
index 3b568af..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.dataflow.spark.streaming;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-
-public final class SparkStreamingPipelineOptionsFactory {
-
- private SparkStreamingPipelineOptionsFactory() {
- }
-
- public static SparkStreamingPipelineOptions create() {
- return PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java
deleted file mode 100644
index 01c4375..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.dataflow.spark.streaming;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
-import com.google.common.collect.ImmutableList;
-
-public class SparkStreamingPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
-
- @Override
- public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
- return ImmutableList.<Class<? extends PipelineOptions>>of(SparkStreamingPipelineOptions
- .class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
deleted file mode 100644
index 5ecd562..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.dataflow.spark.streaming;
-
-
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaRDDLike;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaDStreamLike;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-
-import com.cloudera.dataflow.spark.EvaluationContext;
-import com.cloudera.dataflow.spark.SparkRuntimeContext;
-
-/**
- * Streaming evaluation context helps to handle streaming.
- */
-public class StreamingEvaluationContext extends EvaluationContext {
-
- private final JavaStreamingContext jssc;
- private final long timeout;
- private final Map<PValue, DStreamHolder<?>> pstreams = new LinkedHashMap<>();
- private final Set<DStreamHolder<?>> leafStreams = new LinkedHashSet<>();
-
- public StreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
- JavaStreamingContext jssc, long timeout) {
- super(jsc, pipeline);
- this.jssc = jssc;
- this.timeout = timeout;
- }
-
- /**
- * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for
- * testing.
- */
- private class DStreamHolder<T> {
-
- private Iterable<Iterable<T>> values;
- private Coder<T> coder;
- private JavaDStream<WindowedValue<T>> dStream;
-
- DStreamHolder(Iterable<Iterable<T>> values, Coder<T> coder) {
- this.values = values;
- this.coder = coder;
- }
-
- DStreamHolder(JavaDStream<WindowedValue<T>> dStream) {
- this.dStream = dStream;
- }
-
- @SuppressWarnings("unchecked")
- JavaDStream<WindowedValue<T>> getDStream() {
- if (dStream == null) {
- // create the DStream from values
- Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
- for (Iterable<T> v : values) {
- setOutputRDDFromValues(currentTransform.getTransform(), v, coder);
- rddQueue.offer((JavaRDD<WindowedValue<T>>) getOutputRDD(currentTransform.getTransform()));
- }
- // create dstream from queue, one at a time, no defaults
- // mainly for unit test so no reason to have this configurable
- dStream = jssc.queueStream(rddQueue, true);
- }
- return dStream;
- }
- }
-
- <T> void setDStreamFromQueue(
- PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) {
- pstreams.put((PValue) getOutput(transform), new DStreamHolder<>(values, coder));
- }
-
- <T> void setStream(PTransform<?, ?> transform, JavaDStream<WindowedValue<T>> dStream) {
- PValue pvalue = (PValue) getOutput(transform);
- DStreamHolder<T> dStreamHolder = new DStreamHolder<>(dStream);
- pstreams.put(pvalue, dStreamHolder);
- leafStreams.add(dStreamHolder);
- }
-
- boolean hasStream(PTransform<?, ?> transform) {
- PValue pvalue = (PValue) getInput(transform);
- return pstreams.containsKey(pvalue);
- }
-
- JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) {
- return getStream((PValue) getInput(transform));
- }
-
- JavaDStreamLike<?, ?, ?> getStream(PValue pvalue) {
- DStreamHolder<?> dStreamHolder = pstreams.get(pvalue);
- JavaDStreamLike<?, ?, ?> dStream = dStreamHolder.getDStream();
- leafStreams.remove(dStreamHolder);
- return dStream;
- }
-
- // used to set the RDD from the DStream in the RDDHolder for transformation
- <T> void setInputRDD(
- PTransform<? extends PInput, ?> transform, JavaRDDLike<WindowedValue<T>, ?> rdd) {
- setRDD((PValue) getInput(transform), rdd);
- }
-
- // used to get the RDD transformation output and use it as the DStream transformation output
- JavaRDDLike<?, ?> getOutputRDD(PTransform<?, ?> transform) {
- return getRDD((PValue) getOutput(transform));
- }
-
- public JavaStreamingContext getStreamingContext() {
- return jssc;
- }
-
- @Override
- protected void computeOutputs() {
- for (DStreamHolder<?> streamHolder : leafStreams) {
- computeOutput(streamHolder);
- }
- }
-
- private static <T> void computeOutput(DStreamHolder<T> streamHolder) {
- streamHolder.getDStream().foreachRDD(new Function<JavaRDD<WindowedValue<T>>, Void>() {
- @Override
- public Void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
- rdd.rdd().cache();
- rdd.count();
- return null;
- }
- }); // force a DStream action
- }
-
- @Override
- public void close() {
- if (timeout > 0) {
- jssc.awaitTerminationOrTimeout(timeout);
- } else {
- jssc.awaitTermination();
- }
- //TODO: stop gracefully ?
- jssc.stop(false, false);
- state = State.DONE;
- super.close();
- }
-
- private State state = State.RUNNING;
-
- @Override
- public State getState() {
- return state;
- }
-
- //---------------- override in order to expose in package
- @Override
- protected <I extends PInput> I getInput(PTransform<I, ?> transform) {
- return super.getInput(transform);
- }
- @Override
- protected <O extends POutput> O getOutput(PTransform<?, O> transform) {
- return super.getOutput(transform);
- }
-
- @Override
- protected JavaSparkContext getSparkContext() {
- return super.getSparkContext();
- }
-
- @Override
- protected SparkRuntimeContext getRuntimeContext() {
- return super.getRuntimeContext();
- }
-
- @Override
- protected void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
- super.setCurrentTransform(transform);
- }
-
- @Override
- protected AppliedPTransform<?, ?, ?> getCurrentTransform() {
- return super.getCurrentTransform();
- }
-
- @Override
- protected <T> void setOutputRDD(PTransform<?, ?> transform,
- JavaRDDLike<WindowedValue<T>, ?> rdd) {
- super.setOutputRDD(transform, rdd);
- }
-
- @Override
- protected <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values,
- Coder<T> coder) {
- super.setOutputRDDFromValues(transform, values, coder);
- }
-
- @Override
- protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) {
- return super.hasOutputRDD(transform);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
deleted file mode 100644
index d8ae5e8..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.dataflow.spark.streaming;
-
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.api.client.util.Lists;
-import com.google.api.client.util.Maps;
-import com.google.api.client.util.Sets;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.io.AvroIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.repackaged.com.google.common.reflect.TypeToken;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.values.PDone;
-
-import kafka.serializer.Decoder;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaDStreamLike;
-import org.apache.spark.streaming.api.java.JavaPairInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-
-import scala.Tuple2;
-
-import com.cloudera.dataflow.hadoop.HadoopIO;
-import com.cloudera.dataflow.io.ConsoleIO;
-import com.cloudera.dataflow.io.CreateStream;
-import com.cloudera.dataflow.io.KafkaIO;
-import com.cloudera.dataflow.spark.DoFnFunction;
-import com.cloudera.dataflow.spark.EvaluationContext;
-import com.cloudera.dataflow.spark.SparkPipelineTranslator;
-import com.cloudera.dataflow.spark.TransformEvaluator;
-import com.cloudera.dataflow.spark.TransformTranslator;
-import com.cloudera.dataflow.spark.WindowingHelpers;
-
-/**
- * Supports translation between a DataFlow transform, and Spark's operations on DStreams.
- */
-public final class StreamingTransformTranslator {
-
- private StreamingTransformTranslator() {
- }
-
- private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() {
- return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>() {
- @Override
- public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) {
- @SuppressWarnings("unchecked")
- JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
- (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
- ((StreamingEvaluationContext) context).getStream(transform);
- dstream.map(WindowingHelpers.<T>unwindowFunction()).print(transform.getNum());
- }
- };
- }
-
- private static <K, V> TransformEvaluator<KafkaIO.Read.Unbound<K, V>> kafka() {
- return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>() {
- @Override
- public void evaluate(KafkaIO.Read.Unbound<K, V> transform, EvaluationContext context) {
- StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
- JavaStreamingContext jssc = sec.getStreamingContext();
- Class<K> keyClazz = transform.getKeyClass();
- Class<V> valueClazz = transform.getValueClass();
- Class<? extends Decoder<K>> keyDecoderClazz = transform.getKeyDecoderClass();
- Class<? extends Decoder<V>> valueDecoderClazz = transform.getValueDecoderClass();
- Map<String, String> kafkaParams = transform.getKafkaParams();
- Set<String> topics = transform.getTopics();
- JavaPairInputDStream<K, V> inputPairStream = KafkaUtils.createDirectStream(jssc, keyClazz,
- valueClazz, keyDecoderClazz, valueDecoderClazz, kafkaParams, topics);
- JavaDStream<WindowedValue<KV<K, V>>> inputStream =
- inputPairStream.map(new Function<Tuple2<K, V>, KV<K, V>>() {
- @Override
- public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
- return KV.of(t2._1(), t2._2());
- }
- }).map(WindowingHelpers.<KV<K, V>>windowFunction());
- sec.setStream(transform, inputStream);
- }
- };
- }
-
- private static <T> TransformEvaluator<Create.Values<T>> create() {
- return new TransformEvaluator<Create.Values<T>>() {
- @SuppressWarnings("unchecked")
- @Override
- public void evaluate(Create.Values<T> transform, EvaluationContext context) {
- StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
- Iterable<T> elems = transform.getElements();
- Coder<T> coder = sec.getOutput(transform).getCoder();
- if (coder != VoidCoder.of()) {
- // actual create
- sec.setOutputRDDFromValues(transform, elems, coder);
- } else {
- // fake create as an input
- // creates a stream with a single batch containing a single null element
- // to invoke following transformations once
- // to support DataflowAssert
- sec.setDStreamFromQueue(transform,
- Collections.<Iterable<Void>>singletonList(Collections.singletonList((Void) null)),
- (Coder<Void>) coder);
- }
- }
- };
- }
-
- private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() {
- return new TransformEvaluator<CreateStream.QueuedValues<T>>() {
- @Override
- public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext context) {
- StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
- Iterable<Iterable<T>> values = transform.getQueuedValues();
- Coder<T> coder = sec.getOutput(transform).getCoder();
- sec.setDStreamFromQueue(transform, values, coder);
- }
- };
- }
-
- private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
- return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
- @SuppressWarnings("unchecked")
- @Override
- public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
- StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
- PCollectionList<T> pcs = sec.getInput(transform);
- JavaDStream<WindowedValue<T>> first =
- (JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(0));
- List<JavaDStream<WindowedValue<T>>> rest = Lists.newArrayListWithCapacity(pcs.size() - 1);
- for (int i = 1; i < pcs.size(); i++) {
- rest.add((JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(i)));
- }
- JavaDStream<WindowedValue<T>> dstream = sec.getStreamingContext().union(first, rest);
- sec.setStream(transform, dstream);
- }
- };
- }
-
- private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> rddTransform(
- final SparkPipelineTranslator rddTranslator) {
- return new TransformEvaluator<PT>() {
- @SuppressWarnings("unchecked")
- @Override
- public void evaluate(PT transform, EvaluationContext context) {
- TransformEvaluator<PT> rddEvaluator =
- rddTranslator.translate((Class<PT>) transform.getClass());
-
- StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
- if (sec.hasStream(transform)) {
- JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
- (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
- sec.getStream(transform);
-
- sec.setStream(transform, dStream
- .transform(new RDDTransform<>(sec, rddEvaluator, transform)));
- } else {
- // if the transformation requires direct access to RDD (not in stream)
- // this is used for "fake" transformations like with DataflowAssert
- rddEvaluator.evaluate(transform, context);
- }
- }
- };
- }
-
- /**
- * RDD transform function If the transformation function doesn't have an input, create a fake one
- * as an empty RDD.
- *
- * @param <PT> PTransform type
- */
- private static final class RDDTransform<PT extends PTransform<?, ?>>
- implements Function<JavaRDD<WindowedValue<Object>>, JavaRDD<WindowedValue<Object>>> {
-
- private final StreamingEvaluationContext context;
- private final AppliedPTransform<?, ?, ?> appliedPTransform;
- private final TransformEvaluator<PT> rddEvaluator;
- private final PT transform;
-
-
- private RDDTransform(StreamingEvaluationContext context, TransformEvaluator<PT> rddEvaluator,
- PT transform) {
- this.context = context;
- this.appliedPTransform = context.getCurrentTransform();
- this.rddEvaluator = rddEvaluator;
- this.transform = transform;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public JavaRDD<WindowedValue<Object>>
- call(JavaRDD<WindowedValue<Object>> rdd) throws Exception {
- AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform();
- context.setCurrentTransform(appliedPTransform);
- context.setInputRDD(transform, rdd);
- rddEvaluator.evaluate(transform, context);
- if (!context.hasOutputRDD(transform)) {
- // fake RDD as output
- context.setOutputRDD(transform,
- context.getSparkContext().<WindowedValue<Object>>emptyRDD());
- }
- JavaRDD<WindowedValue<Object>> outRDD =
- (JavaRDD<WindowedValue<Object>>) context.getOutputRDD(transform);
- context.setCurrentTransform(existingAPT);
- return outRDD;
- }
- }
-
- @SuppressWarnings("unchecked")
- private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> foreachRDD(
- final SparkPipelineTranslator rddTranslator) {
- return new TransformEvaluator<PT>() {
- @Override
- public void evaluate(PT transform, EvaluationContext context) {
- TransformEvaluator<PT> rddEvaluator =
- rddTranslator.translate((Class<PT>) transform.getClass());
-
- StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
- if (sec.hasStream(transform)) {
- JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
- (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
- sec.getStream(transform);
-
- dStream.foreachRDD(new RDDOutputOperator<>(sec, rddEvaluator, transform));
- } else {
- rddEvaluator.evaluate(transform, context);
- }
- }
- };
- }
-
- /**
- * RDD output function.
- *
- * @param <PT> PTransform type
- */
- private static final class RDDOutputOperator<PT extends PTransform<?, ?>>
- implements Function<JavaRDD<WindowedValue<Object>>, Void> {
-
- private final StreamingEvaluationContext context;
- private final AppliedPTransform<?, ?, ?> appliedPTransform;
- private final TransformEvaluator<PT> rddEvaluator;
- private final PT transform;
-
-
- private RDDOutputOperator(StreamingEvaluationContext context,
- TransformEvaluator<PT> rddEvaluator, PT transform) {
- this.context = context;
- this.appliedPTransform = context.getCurrentTransform();
- this.rddEvaluator = rddEvaluator;
- this.transform = transform;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public Void call(JavaRDD<WindowedValue<Object>> rdd) throws Exception {
- AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform();
- context.setCurrentTransform(appliedPTransform);
- context.setInputRDD(transform, rdd);
- rddEvaluator.evaluate(transform, context);
- context.setCurrentTransform(existingAPT);
- return null;
- }
- }
-
- private static final TransformTranslator.FieldGetter WINDOW_FG =
- new TransformTranslator.FieldGetter(Window.Bound.class);
-
- private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
- return new TransformEvaluator<Window.Bound<T>>() {
- @Override
- public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
- StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
- //--- first we apply windowing to the stream
- WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
- @SuppressWarnings("unchecked")
- JavaDStream<WindowedValue<T>> dStream =
- (JavaDStream<WindowedValue<T>>) sec.getStream(transform);
- if (windowFn instanceof FixedWindows) {
- Duration windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize()
- .getMillis());
- sec.setStream(transform, dStream.window(windowDuration));
- } else if (windowFn instanceof SlidingWindows) {
- Duration windowDuration = Durations.milliseconds(((SlidingWindows) windowFn).getSize()
- .getMillis());
- Duration slideDuration = Durations.milliseconds(((SlidingWindows) windowFn).getPeriod()
- .getMillis());
- sec.setStream(transform, dStream.window(windowDuration, slideDuration));
- }
- //--- then we apply windowing to the elements
- DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
- DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn,
- ((StreamingEvaluationContext)context).getRuntimeContext(), null);
- @SuppressWarnings("unchecked")
- JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
- (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
- sec.getStream(transform);
- sec.setStream(transform, dstream.mapPartitions(dofn));
- }
- };
- }
-
- private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps
- .newHashMap();
-
- static {
- EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
- EVALUATORS.put(CreateStream.QueuedValues.class, createFromQueue());
- EVALUATORS.put(Create.Values.class, create());
- EVALUATORS.put(KafkaIO.Read.Unbound.class, kafka());
- EVALUATORS.put(Window.Bound.class, window());
- EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
- }
-
- private static final Set<Class<? extends PTransform>> UNSUPPORTED_EVALUATORS = Sets
- .newHashSet();
-
- static {
- //TODO - add support for the following
- UNSUPPORTED_EVALUATORS.add(TextIO.Read.Bound.class);
- UNSUPPORTED_EVALUATORS.add(TextIO.Write.Bound.class);
- UNSUPPORTED_EVALUATORS.add(AvroIO.Read.Bound.class);
- UNSUPPORTED_EVALUATORS.add(AvroIO.Write.Bound.class);
- UNSUPPORTED_EVALUATORS.add(HadoopIO.Read.Bound.class);
- UNSUPPORTED_EVALUATORS.add(HadoopIO.Write.Bound.class);
- }
-
- @SuppressWarnings("unchecked")
- private static <PT extends PTransform<?, ?>> TransformEvaluator<PT>
- getTransformEvaluator(Class<PT> clazz, SparkPipelineTranslator rddTranslator) {
- TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz);
- if (transform == null) {
- if (UNSUPPORTED_EVALUATORS.contains(clazz)) {
- throw new UnsupportedOperationException("Dataflow transformation " + clazz
- .getCanonicalName()
- + " is currently unsupported by the Spark streaming pipeline");
- }
- // DStream transformations will transform an RDD into another RDD
- // Actions will create output
- // In Dataflow it depends on the PTransform's Input and Output class
- Class<?> pTOutputClazz = getPTransformOutputClazz(clazz);
- if (PDone.class.equals(pTOutputClazz)) {
- return foreachRDD(rddTranslator);
- } else {
- return rddTransform(rddTranslator);
- }
- }
- return transform;
- }
-
- private static <PT extends PTransform<?, ?>> Class<?> getPTransformOutputClazz(Class<PT> clazz) {
- Type[] types = ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments();
- return TypeToken.of(clazz).resolveType(types[1]).getRawType();
- }
-
- /**
- * Translator matches Dataflow transformation with the appropriate Spark streaming evaluator.
- * rddTranslator uses Spark evaluators in transform/foreachRDD to evaluate the transformation
- */
- public static class Translator implements SparkPipelineTranslator {
-
- private final SparkPipelineTranslator rddTranslator;
-
- public Translator(SparkPipelineTranslator rddTranslator) {
- this.rddTranslator = rddTranslator;
- }
-
- @Override
- public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
- // streaming includes rdd transformations as well
- return EVALUATORS.containsKey(clazz) || rddTranslator.hasTranslation(clazz);
- }
-
- @Override
- public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) {
- return getTransformEvaluator(clazz, rddTranslator);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java
deleted file mode 100644
index 406dfcc..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark.streaming;
-
-import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.Durations;
-
-import com.cloudera.dataflow.spark.SparkPipelineRunner;
-import com.cloudera.dataflow.spark.SparkPipelineTranslator;
-import com.cloudera.dataflow.spark.TransformTranslator;
-
-/**
- * Pipeline {@link SparkPipelineRunner.Evaluator} to detect windowing.
- */
-public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.Evaluator {
-
- // Currently, Spark streaming recommends batches no smaller then 500 msec
- private static final Duration SPARK_MIN_WINDOW = Durations.milliseconds(500);
-
- private boolean windowing;
- private Duration batchDuration;
-
- public StreamingWindowPipelineDetector(SparkPipelineTranslator translator) {
- super(translator);
- }
-
- private static final TransformTranslator.FieldGetter WINDOW_FG =
- new TransformTranslator.FieldGetter(Window.Bound.class);
-
- // Use the smallest window (fixed or sliding) as Spark streaming's batch duration
- @Override
- protected <PT extends PTransform<? super PInput, POutput>> void
- doVisitTransform(TransformTreeNode node) {
- @SuppressWarnings("unchecked")
- PT transform = (PT) node.getTransform();
- @SuppressWarnings("unchecked")
- Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
- if (transformClass.isAssignableFrom(Window.Bound.class)) {
- WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform);
- if (windowFn instanceof FixedWindows) {
- setBatchDuration(((FixedWindows) windowFn).getSize());
- } else if (windowFn instanceof SlidingWindows) {
- if (((SlidingWindows) windowFn).getOffset().getMillis() > 0) {
- throw new UnsupportedOperationException("Spark does not support window offsets");
- }
- // Sliding window size might as well set the batch duration. Applying the transformation
- // will add the "slide"
- setBatchDuration(((SlidingWindows) windowFn).getSize());
- } else if (!(windowFn instanceof GlobalWindows)) {
- throw new IllegalStateException("Windowing function not supported: " + windowFn);
- }
- }
- }
-
- private void setBatchDuration(org.joda.time.Duration duration) {
- Long durationMillis = duration.getMillis();
- // validate window size
- if (durationMillis < SPARK_MIN_WINDOW.milliseconds()) {
- throw new IllegalArgumentException("Windowing of size " + durationMillis +
- "msec is not supported!");
- }
- // choose the smallest duration to be Spark's batch duration, larger ones will be handled
- // as window functions over the batched-stream
- if (!windowing || this.batchDuration.milliseconds() > durationMillis) {
- this.batchDuration = Durations.milliseconds(durationMillis);
- }
- windowing = true;
- }
-
- public boolean isWindowing() {
- return windowing;
- }
-
- public Duration getBatchDuration() {
- return batchDuration;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/DoFnFunction.java
new file mode 100644
index 0000000..6af829f
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/DoFnFunction.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.spark.api.java.function.FlatMapFunction;
+
+/**
+ * Dataflow's Do functions correspond to Spark's FlatMap functions.
+ *
+ * @param <I> Input element type.
+ * @param <O> Output element type.
+ */
+public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValue<I>>,
+ WindowedValue<O>> {
+ private final DoFn<I, O> mFunction;
+ private final SparkRuntimeContext mRuntimeContext;
+ private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
+
+ /**
+ * @param fn DoFunction to be wrapped.
+ * @param runtime Runtime to apply function in.
+ * @param sideInputs Side inputs used in DoFunction.
+ */
+ public DoFnFunction(DoFn<I, O> fn,
+ SparkRuntimeContext runtime,
+ Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
+ this.mFunction = fn;
+ this.mRuntimeContext = runtime;
+ this.mSideInputs = sideInputs;
+ }
+
+ @Override
+ public Iterable<WindowedValue<O>> call(Iterator<WindowedValue<I>> iter) throws
+ Exception {
+ ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
+ ctxt.setup();
+ mFunction.startBundle(ctxt);
+ return ctxt.getOutputIterable(iter, mFunction);
+ }
+
+ private class ProcCtxt extends SparkProcessContext<I, O, WindowedValue<O>> {
+
+ private final List<WindowedValue<O>> outputs = new LinkedList<>();
+
+ ProcCtxt(DoFn<I, O> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
+ BroadcastHelper<?>> sideInputs) {
+ super(fn, runtimeContext, sideInputs);
+ }
+
+ @Override
+ public synchronized void output(O o) {
+ outputs.add(windowedValue != null ? windowedValue.withValue(o) :
+ WindowedValue.valueInEmptyWindows(o));
+ }
+
+ @Override
+ public synchronized void output(WindowedValue<O> o) {
+ outputs.add(o);
+ }
+
+ @Override
+ protected void clearOutput() {
+ outputs.clear();
+ }
+
+ @Override
+ protected Iterator<WindowedValue<O>> getOutputIterator() {
+ return outputs.iterator();
+ }
+ }
+
+}