You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/15 19:48:06 UTC

[09/23] incubator-beam git commit: [BEAM-11] Spark runner directory structure and pom setup.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
deleted file mode 100644
index 58b1924..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
+++ /dev/null
@@ -1,800 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import com.google.api.client.util.Maps;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.io.AvroIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaRDDLike;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Tuple2;
-
-import static com.cloudera.dataflow.spark.ShardNameBuilder.getOutputDirectory;
-import static com.cloudera.dataflow.spark.ShardNameBuilder.getOutputFilePrefix;
-import static com.cloudera.dataflow.spark.ShardNameBuilder.getOutputFileTemplate;
-import static com.cloudera.dataflow.spark.ShardNameBuilder.replaceShardCount;
-
-import com.cloudera.dataflow.hadoop.HadoopIO;
-
-/**
- * Supports translation between a DataFlow transform, and Spark's operations on RDDs.
- */
-public final class TransformTranslator {
-
-  private TransformTranslator() {
-  }
-
-  public static class FieldGetter {
-    private final Map<String, Field> fields;
-
-    public FieldGetter(Class<?> clazz) {
-      this.fields = Maps.newHashMap();
-      for (Field f : clazz.getDeclaredFields()) {
-        f.setAccessible(true);
-        this.fields.put(f.getName(), f);
-      }
-    }
-
-    public <T> T get(String fieldname, Object value) {
-      try {
-        @SuppressWarnings("unchecked")
-        T fieldValue = (T) fields.get(fieldname).get(value);
-        return fieldValue;
-      } catch (IllegalAccessException e) {
-        throw new IllegalStateException(e);
-      }
-    }
-  }
-
-  private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
-    return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
-      @SuppressWarnings("unchecked")
-      @Override
-      public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
-        PCollectionList<T> pcs = context.getInput(transform);
-        JavaRDD<WindowedValue<T>>[] rdds = new JavaRDD[pcs.size()];
-        for (int i = 0; i < rdds.length; i++) {
-          rdds[i] = (JavaRDD<WindowedValue<T>>) context.getRDD(pcs.get(i));
-        }
-        JavaRDD<WindowedValue<T>> rdd = context.getSparkContext().union(rdds);
-        context.setOutputRDD(transform, rdd);
-      }
-    };
-  }
-
-  private static <K, V> TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>> gbk() {
-    return new TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>>() {
-      @Override
-      public void evaluate(GroupByKey.GroupByKeyOnly<K, V> transform, EvaluationContext context) {
-        @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<KV<K, V>>, ?> inRDD =
-            (JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context.getInputRDD(transform);
-        @SuppressWarnings("unchecked")
-        KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
-        Coder<K> keyCoder = coder.getKeyCoder();
-        Coder<V> valueCoder = coder.getValueCoder();
-
-        // Use coders to convert objects in the PCollection to byte arrays, so they
-        // can be transferred over the network for the shuffle.
-        JavaRDDLike<WindowedValue<KV<K, Iterable<V>>>, ?> outRDD = fromPair(
-              toPair(inRDD.map(WindowingHelpers.<KV<K, V>>unwindowFunction()))
-            .mapToPair(CoderHelpers.toByteFunction(keyCoder, valueCoder))
-            .groupByKey()
-            .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, valueCoder)))
-            // empty windows are OK here, see GroupByKey#evaluateHelper in the SDK
-            .map(WindowingHelpers.<KV<K, Iterable<V>>>windowFunction());
-        context.setOutputRDD(transform, outRDD);
-      }
-    };
-  }
-
-  private static final FieldGetter GROUPED_FG = new FieldGetter(Combine.GroupedValues.class);
-
-  private static <K, VI, VO> TransformEvaluator<Combine.GroupedValues<K, VI, VO>> grouped() {
-    return new TransformEvaluator<Combine.GroupedValues<K, VI, VO>>() {
-      @Override
-      public void evaluate(Combine.GroupedValues<K, VI, VO> transform, EvaluationContext context) {
-        Combine.KeyedCombineFn<K, VI, ?, VO> keyed = GROUPED_FG.get("fn", transform);
-        @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?> inRDD =
-            (JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?>) context.getInputRDD(transform);
-        context.setOutputRDD(transform,
-            inRDD.map(new KVFunction<>(keyed)));
-      }
-    };
-  }
-
-  private static final FieldGetter COMBINE_GLOBALLY_FG = new FieldGetter(Combine.Globally.class);
-
-  private static <I, A, O> TransformEvaluator<Combine.Globally<I, O>> combineGlobally() {
-    return new TransformEvaluator<Combine.Globally<I, O>>() {
-
-      @Override
-      public void evaluate(Combine.Globally<I, O> transform, EvaluationContext context) {
-        final Combine.CombineFn<I, A, O> globally = COMBINE_GLOBALLY_FG.get("fn", transform);
-
-        @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<I>, ?> inRdd =
-            (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);
-
-        final Coder<I> iCoder = context.getInput(transform).getCoder();
-        final Coder<A> aCoder;
-        try {
-          aCoder = globally.getAccumulatorCoder(
-              context.getPipeline().getCoderRegistry(), iCoder);
-        } catch (CannotProvideCoderException e) {
-          throw new IllegalStateException("Could not determine coder for accumulator", e);
-        }
-
-        // Use coders to convert objects in the PCollection to byte arrays, so they
-        // can be transferred over the network for the shuffle.
-        JavaRDD<byte[]> inRddBytes = inRdd
-            .map(WindowingHelpers.<I>unwindowFunction())
-            .map(CoderHelpers.toByteFunction(iCoder));
-
-        /*A*/ byte[] acc = inRddBytes.aggregate(
-            CoderHelpers.toByteArray(globally.createAccumulator(), aCoder),
-            new Function2</*A*/ byte[], /*I*/ byte[], /*A*/ byte[]>() {
-              @Override
-              public /*A*/ byte[] call(/*A*/ byte[] ab, /*I*/ byte[] ib) throws Exception {
-                A a = CoderHelpers.fromByteArray(ab, aCoder);
-                I i = CoderHelpers.fromByteArray(ib, iCoder);
-                return CoderHelpers.toByteArray(globally.addInput(a, i), aCoder);
-              }
-            },
-            new Function2</*A*/ byte[], /*A*/ byte[], /*A*/ byte[]>() {
-              @Override
-              public /*A*/ byte[] call(/*A*/ byte[] a1b, /*A*/ byte[] a2b) throws Exception {
-                A a1 = CoderHelpers.fromByteArray(a1b, aCoder);
-                A a2 = CoderHelpers.fromByteArray(a2b, aCoder);
-                // don't use Guava's ImmutableList.of as values may be null
-                List<A> accumulators = Collections.unmodifiableList(Arrays.asList(a1, a2));
-                A merged = globally.mergeAccumulators(accumulators);
-                return CoderHelpers.toByteArray(merged, aCoder);
-              }
-            }
-        );
-        O output = globally.extractOutput(CoderHelpers.fromByteArray(acc, aCoder));
-
-        Coder<O> coder = context.getOutput(transform).getCoder();
-        JavaRDD<byte[]> outRdd = context.getSparkContext().parallelize(
-            // don't use Guava's ImmutableList.of as output may be null
-            CoderHelpers.toByteArrays(Collections.singleton(output), coder));
-        context.setOutputRDD(transform, outRdd.map(CoderHelpers.fromByteFunction(coder))
-            .map(WindowingHelpers.<O>windowFunction()));
-      }
-    };
-  }
-
-  private static final FieldGetter COMBINE_PERKEY_FG = new FieldGetter(Combine.PerKey.class);
-
-  private static <K, VI, VA, VO> TransformEvaluator<Combine.PerKey<K, VI, VO>> combinePerKey() {
-    return new TransformEvaluator<Combine.PerKey<K, VI, VO>>() {
-      @Override
-      public void evaluate(Combine.PerKey<K, VI, VO> transform, EvaluationContext context) {
-        final Combine.KeyedCombineFn<K, VI, VA, VO> keyed =
-            COMBINE_PERKEY_FG.get("fn", transform);
-        @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<KV<K, VI>>, ?> inRdd =
-            (JavaRDDLike<WindowedValue<KV<K, VI>>, ?>) context.getInputRDD(transform);
-
-        @SuppressWarnings("unchecked")
-        KvCoder<K, VI> inputCoder = (KvCoder<K, VI>) context.getInput(transform).getCoder();
-        Coder<K> keyCoder = inputCoder.getKeyCoder();
-        Coder<VI> viCoder = inputCoder.getValueCoder();
-        Coder<VA> vaCoder;
-        try {
-          vaCoder = keyed.getAccumulatorCoder(
-              context.getPipeline().getCoderRegistry(), keyCoder, viCoder);
-        } catch (CannotProvideCoderException e) {
-          throw new IllegalStateException("Could not determine coder for accumulator", e);
-        }
-        Coder<KV<K, VI>> kviCoder = KvCoder.of(keyCoder, viCoder);
-        Coder<KV<K, VA>> kvaCoder = KvCoder.of(keyCoder, vaCoder);
-
-        // We need to duplicate K as both the key of the JavaPairRDD as well as inside the value,
-        // since the functions passed to combineByKey don't receive the associated key of each
-        // value, and we need to map back into methods in Combine.KeyedCombineFn, which each
-        // require the key in addition to the VI's and VA's being merged/accumulated. Once Spark
-        // provides a way to include keys in the arguments of combine/merge functions, we won't
-        // need to duplicate the keys anymore.
-
-        // Key has to bw windowed in order to group by window as well
-        JavaPairRDD<WindowedValue<K>, WindowedValue<KV<K, VI>>> inRddDuplicatedKeyPair =
-            inRdd.mapToPair(
-                new PairFunction<WindowedValue<KV<K, VI>>, WindowedValue<K>,
-                    WindowedValue<KV<K, VI>>>() {
-                  @Override
-                  public Tuple2<WindowedValue<K>,
-                      WindowedValue<KV<K, VI>>> call(WindowedValue<KV<K, VI>> kv) {
-                    WindowedValue<K> wk = WindowedValue.of(kv.getValue().getKey(),
-                        kv.getTimestamp(), kv.getWindows(), kv.getPane());
-                    return new Tuple2<>(wk, kv);
-                  }
-                });
-        //-- windowed coders
-        final WindowedValue.FullWindowedValueCoder<K> wkCoder =
-                WindowedValue.FullWindowedValueCoder.of(keyCoder,
-                context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
-        final WindowedValue.FullWindowedValueCoder<KV<K, VI>> wkviCoder =
-                WindowedValue.FullWindowedValueCoder.of(kviCoder,
-                context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
-        final WindowedValue.FullWindowedValueCoder<KV<K, VA>> wkvaCoder =
-                WindowedValue.FullWindowedValueCoder.of(kvaCoder,
-                context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
-
-        // Use coders to convert objects in the PCollection to byte arrays, so they
-        // can be transferred over the network for the shuffle.
-        JavaPairRDD<ByteArray, byte[]> inRddDuplicatedKeyPairBytes = inRddDuplicatedKeyPair
-            .mapToPair(CoderHelpers.toByteFunction(wkCoder, wkviCoder));
-
-        // The output of combineByKey will be "VA" (accumulator) types rather than "VO" (final
-        // output types) since Combine.CombineFn only provides ways to merge VAs, and no way
-        // to merge VOs.
-        JavaPairRDD</*K*/ ByteArray, /*KV<K, VA>*/ byte[]> accumulatedBytes =
-            inRddDuplicatedKeyPairBytes.combineByKey(
-            new Function</*KV<K, VI>*/ byte[], /*KV<K, VA>*/ byte[]>() {
-              @Override
-              public /*KV<K, VA>*/ byte[] call(/*KV<K, VI>*/ byte[] input) {
-                WindowedValue<KV<K, VI>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder);
-                VA va = keyed.createAccumulator(wkvi.getValue().getKey());
-                va = keyed.addInput(wkvi.getValue().getKey(), va, wkvi.getValue().getValue());
-                WindowedValue<KV<K, VA>> wkva =
-                    WindowedValue.of(KV.of(wkvi.getValue().getKey(), va), wkvi.getTimestamp(),
-                    wkvi.getWindows(), wkvi.getPane());
-                return CoderHelpers.toByteArray(wkva, wkvaCoder);
-              }
-            },
-            new Function2</*KV<K, VA>*/ byte[], /*KV<K, VI>*/ byte[], /*KV<K, VA>*/ byte[]>() {
-              @Override
-              public /*KV<K, VA>*/ byte[] call(/*KV<K, VA>*/ byte[] acc,
-                  /*KV<K, VI>*/ byte[] input) {
-                WindowedValue<KV<K, VA>> wkva = CoderHelpers.fromByteArray(acc, wkvaCoder);
-                WindowedValue<KV<K, VI>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder);
-                VA va = keyed.addInput(wkva.getValue().getKey(), wkva.getValue().getValue(),
-                    wkvi.getValue().getValue());
-                wkva = WindowedValue.of(KV.of(wkva.getValue().getKey(), va), wkva.getTimestamp(),
-                    wkva.getWindows(), wkva.getPane());
-                return CoderHelpers.toByteArray(wkva, wkvaCoder);
-              }
-            },
-            new Function2</*KV<K, VA>*/ byte[], /*KV<K, VA>*/ byte[], /*KV<K, VA>*/ byte[]>() {
-              @Override
-              public /*KV<K, VA>*/ byte[] call(/*KV<K, VA>*/ byte[] acc1,
-                  /*KV<K, VA>*/ byte[] acc2) {
-                WindowedValue<KV<K, VA>> wkva1 = CoderHelpers.fromByteArray(acc1, wkvaCoder);
-                WindowedValue<KV<K, VA>> wkva2 = CoderHelpers.fromByteArray(acc2, wkvaCoder);
-                VA va = keyed.mergeAccumulators(wkva1.getValue().getKey(),
-                    // don't use Guava's ImmutableList.of as values may be null
-                    Collections.unmodifiableList(Arrays.asList(wkva1.getValue().getValue(),
-                    wkva2.getValue().getValue())));
-                WindowedValue<KV<K, VA>> wkva = WindowedValue.of(KV.of(wkva1.getValue().getKey(),
-                    va), wkva1.getTimestamp(), wkva1.getWindows(), wkva1.getPane());
-                return CoderHelpers.toByteArray(wkva, wkvaCoder);
-              }
-            });
-
-        JavaPairRDD<WindowedValue<K>, WindowedValue<VO>> extracted = accumulatedBytes
-            .mapToPair(CoderHelpers.fromByteFunction(wkCoder, wkvaCoder))
-            .mapValues(
-                new Function<WindowedValue<KV<K, VA>>, WindowedValue<VO>>() {
-                  @Override
-                  public WindowedValue<VO> call(WindowedValue<KV<K, VA>> acc) {
-                    return WindowedValue.of(keyed.extractOutput(acc.getValue().getKey(),
-                        acc.getValue().getValue()), acc.getTimestamp(),
-                        acc.getWindows(), acc.getPane());
-                  }
-                });
-
-        context.setOutputRDD(transform,
-            fromPair(extracted)
-            .map(new Function<KV<WindowedValue<K>, WindowedValue<VO>>, WindowedValue<KV<K, VO>>>() {
-              @Override
-              public WindowedValue<KV<K, VO>> call(KV<WindowedValue<K>, WindowedValue<VO>> kwvo)
-                  throws Exception {
-                WindowedValue<VO> wvo = kwvo.getValue();
-                KV<K, VO> kvo = KV.of(kwvo.getKey().getValue(), wvo.getValue());
-                return WindowedValue.of(kvo, wvo.getTimestamp(), wvo.getWindows(), wvo.getPane());
-              }
-            }));
-      }
-    };
-  }
-
-  private static final class KVFunction<K, VI, VO>
-      implements Function<WindowedValue<KV<K, Iterable<VI>>>, WindowedValue<KV<K, VO>>> {
-    private final Combine.KeyedCombineFn<K, VI, ?, VO> keyed;
-
-     KVFunction(Combine.KeyedCombineFn<K, VI, ?, VO> keyed) {
-      this.keyed = keyed;
-    }
-
-    @Override
-    public WindowedValue<KV<K, VO>> call(WindowedValue<KV<K, Iterable<VI>>> windowedKv)
-        throws Exception {
-      KV<K, Iterable<VI>> kv = windowedKv.getValue();
-      return WindowedValue.of(KV.of(kv.getKey(), keyed.apply(kv.getKey(), kv.getValue())),
-          windowedKv.getTimestamp(), windowedKv.getWindows(), windowedKv.getPane());
-    }
-  }
-
-  private static <K, V> JavaPairRDD<K, V> toPair(JavaRDDLike<KV<K, V>, ?> rdd) {
-    return rdd.mapToPair(new PairFunction<KV<K, V>, K, V>() {
-      @Override
-      public Tuple2<K, V> call(KV<K, V> kv) {
-        return new Tuple2<>(kv.getKey(), kv.getValue());
-      }
-    });
-  }
-
-  private static <K, V> JavaRDDLike<KV<K, V>, ?> fromPair(JavaPairRDD<K, V> rdd) {
-    return rdd.map(new Function<Tuple2<K, V>, KV<K, V>>() {
-      @Override
-      public KV<K, V> call(Tuple2<K, V> t2) {
-        return KV.of(t2._1(), t2._2());
-      }
-    });
-  }
-
-  private static <I, O> TransformEvaluator<ParDo.Bound<I, O>> parDo() {
-    return new TransformEvaluator<ParDo.Bound<I, O>>() {
-      @Override
-      public void evaluate(ParDo.Bound<I, O> transform, EvaluationContext context) {
-        DoFnFunction<I, O> dofn =
-            new DoFnFunction<>(transform.getFn(),
-                context.getRuntimeContext(),
-                getSideInputs(transform.getSideInputs(), context));
-        @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<I>, ?> inRDD =
-            (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);
-        context.setOutputRDD(transform, inRDD.mapPartitions(dofn));
-      }
-    };
-  }
-
-  private static final FieldGetter MULTIDO_FG = new FieldGetter(ParDo.BoundMulti.class);
-
-  private static <I, O> TransformEvaluator<ParDo.BoundMulti<I, O>> multiDo() {
-    return new TransformEvaluator<ParDo.BoundMulti<I, O>>() {
-      @Override
-      public void evaluate(ParDo.BoundMulti<I, O> transform, EvaluationContext context) {
-        TupleTag<O> mainOutputTag = MULTIDO_FG.get("mainOutputTag", transform);
-        MultiDoFnFunction<I, O> multifn = new MultiDoFnFunction<>(
-            transform.getFn(),
-            context.getRuntimeContext(),
-            mainOutputTag,
-            getSideInputs(transform.getSideInputs(), context));
-
-        @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<I>, ?> inRDD =
-            (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);
-        JavaPairRDD<TupleTag<?>, WindowedValue<?>> all = inRDD
-            .mapPartitionsToPair(multifn)
-            .cache();
-
-        PCollectionTuple pct = context.getOutput(transform);
-        for (Map.Entry<TupleTag<?>, PCollection<?>> e : pct.getAll().entrySet()) {
-          @SuppressWarnings("unchecked")
-          JavaPairRDD<TupleTag<?>, WindowedValue<?>> filtered =
-              all.filter(new TupleTagFilter(e.getKey()));
-          @SuppressWarnings("unchecked")
-          // Object is the best we can do since different outputs can have different tags
-          JavaRDD<WindowedValue<Object>> values =
-              (JavaRDD<WindowedValue<Object>>) (JavaRDD<?>) filtered.values();
-          context.setRDD(e.getValue(), values);
-        }
-      }
-    };
-  }
-
-
-  private static <T> TransformEvaluator<TextIO.Read.Bound<T>> readText() {
-    return new TransformEvaluator<TextIO.Read.Bound<T>>() {
-      @Override
-      public void evaluate(TextIO.Read.Bound<T> transform, EvaluationContext context) {
-        String pattern = transform.getFilepattern();
-        JavaRDD<WindowedValue<String>> rdd = context.getSparkContext().textFile(pattern)
-                .map(WindowingHelpers.<String>windowFunction());
-        context.setOutputRDD(transform, rdd);
-      }
-    };
-  }
-
-  private static <T> TransformEvaluator<TextIO.Write.Bound<T>> writeText() {
-    return new TransformEvaluator<TextIO.Write.Bound<T>>() {
-      @Override
-      public void evaluate(TextIO.Write.Bound<T> transform, EvaluationContext context) {
-        @SuppressWarnings("unchecked")
-        JavaPairRDD<T, Void> last =
-            ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform))
-            .map(WindowingHelpers.<T>unwindowFunction())
-            .mapToPair(new PairFunction<T, T,
-                    Void>() {
-              @Override
-              public Tuple2<T, Void> call(T t) throws Exception {
-                return new Tuple2<>(t, null);
-              }
-            });
-        ShardTemplateInformation shardTemplateInfo =
-            new ShardTemplateInformation(transform.getNumShards(),
-                transform.getShardTemplate(), transform.getFilenamePrefix(),
-                transform.getFilenameSuffix());
-        writeHadoopFile(last, new Configuration(), shardTemplateInfo, Text.class,
-            NullWritable.class, TemplatedTextOutputFormat.class);
-      }
-    };
-  }
-
-  private static <T> TransformEvaluator<AvroIO.Read.Bound<T>> readAvro() {
-    return new TransformEvaluator<AvroIO.Read.Bound<T>>() {
-      @Override
-      public void evaluate(AvroIO.Read.Bound<T> transform, EvaluationContext context) {
-        String pattern = transform.getFilepattern();
-        JavaSparkContext jsc = context.getSparkContext();
-        @SuppressWarnings("unchecked")
-        JavaRDD<AvroKey<T>> avroFile = (JavaRDD<AvroKey<T>>) (JavaRDD<?>)
-            jsc.newAPIHadoopFile(pattern,
-                                 AvroKeyInputFormat.class,
-                                 AvroKey.class, NullWritable.class,
-                                 new Configuration()).keys();
-        JavaRDD<WindowedValue<T>> rdd = avroFile.map(
-            new Function<AvroKey<T>, T>() {
-              @Override
-              public T call(AvroKey<T> key) {
-                return key.datum();
-              }
-            }).map(WindowingHelpers.<T>windowFunction());
-        context.setOutputRDD(transform, rdd);
-      }
-    };
-  }
-
-  private static <T> TransformEvaluator<AvroIO.Write.Bound<T>> writeAvro() {
-    return new TransformEvaluator<AvroIO.Write.Bound<T>>() {
-      @Override
-      public void evaluate(AvroIO.Write.Bound<T> transform, EvaluationContext context) {
-        Job job;
-        try {
-          job = Job.getInstance();
-        } catch (IOException e) {
-          throw new IllegalStateException(e);
-        }
-        AvroJob.setOutputKeySchema(job, transform.getSchema());
-        @SuppressWarnings("unchecked")
-        JavaPairRDD<AvroKey<T>, NullWritable> last =
-            ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform))
-            .map(WindowingHelpers.<T>unwindowFunction())
-            .mapToPair(new PairFunction<T, AvroKey<T>, NullWritable>() {
-              @Override
-              public Tuple2<AvroKey<T>, NullWritable> call(T t) throws Exception {
-                return new Tuple2<>(new AvroKey<>(t), NullWritable.get());
-              }
-            });
-        ShardTemplateInformation shardTemplateInfo =
-            new ShardTemplateInformation(transform.getNumShards(),
-            transform.getShardTemplate(), transform.getFilenamePrefix(),
-            transform.getFilenameSuffix());
-        writeHadoopFile(last, job.getConfiguration(), shardTemplateInfo,
-            AvroKey.class, NullWritable.class, TemplatedAvroKeyOutputFormat.class);
-      }
-    };
-  }
-
-  private static <K, V> TransformEvaluator<HadoopIO.Read.Bound<K, V>> readHadoop() {
-    return new TransformEvaluator<HadoopIO.Read.Bound<K, V>>() {
-      @Override
-      public void evaluate(HadoopIO.Read.Bound<K, V> transform, EvaluationContext context) {
-        String pattern = transform.getFilepattern();
-        JavaSparkContext jsc = context.getSparkContext();
-        @SuppressWarnings ("unchecked")
-        JavaPairRDD<K, V> file = jsc.newAPIHadoopFile(pattern,
-            transform.getFormatClass(),
-            transform.getKeyClass(), transform.getValueClass(),
-            new Configuration());
-        JavaRDD<WindowedValue<KV<K, V>>> rdd =
-            file.map(new Function<Tuple2<K, V>, KV<K, V>>() {
-          @Override
-          public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
-            return KV.of(t2._1(), t2._2());
-          }
-        }).map(WindowingHelpers.<KV<K, V>>windowFunction());
-        context.setOutputRDD(transform, rdd);
-      }
-    };
-  }
-
-  private static <K, V> TransformEvaluator<HadoopIO.Write.Bound<K, V>> writeHadoop() {
-    return new TransformEvaluator<HadoopIO.Write.Bound<K, V>>() {
-      @Override
-      public void evaluate(HadoopIO.Write.Bound<K, V> transform, EvaluationContext context) {
-        @SuppressWarnings("unchecked")
-        JavaPairRDD<K, V> last = ((JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context
-            .getInputRDD(transform))
-            .map(WindowingHelpers.<KV<K, V>>unwindowFunction())
-            .mapToPair(new PairFunction<KV<K, V>, K, V>() {
-              @Override
-              public Tuple2<K, V> call(KV<K, V> t) throws Exception {
-                return new Tuple2<>(t.getKey(), t.getValue());
-              }
-            });
-        ShardTemplateInformation shardTemplateInfo =
-            new ShardTemplateInformation(transform.getNumShards(),
-                transform.getShardTemplate(), transform.getFilenamePrefix(),
-                transform.getFilenameSuffix());
-        Configuration conf = new Configuration();
-        for (Map.Entry<String, String> e : transform.getConfigurationProperties().entrySet()) {
-          conf.set(e.getKey(), e.getValue());
-        }
-        writeHadoopFile(last, conf, shardTemplateInfo,
-            transform.getKeyClass(), transform.getValueClass(), transform.getFormatClass());
-      }
-    };
-  }
-
-  private static final class ShardTemplateInformation {
-    private final int numShards;
-    private final String shardTemplate;
-    private final String filenamePrefix;
-    private final String filenameSuffix;
-
-    private ShardTemplateInformation(int numShards, String shardTemplate, String
-        filenamePrefix, String filenameSuffix) {
-      this.numShards = numShards;
-      this.shardTemplate = shardTemplate;
-      this.filenamePrefix = filenamePrefix;
-      this.filenameSuffix = filenameSuffix;
-    }
-
-    int getNumShards() {
-      return numShards;
-    }
-
-    String getShardTemplate() {
-      return shardTemplate;
-    }
-
-    String getFilenamePrefix() {
-      return filenamePrefix;
-    }
-
-    String getFilenameSuffix() {
-      return filenameSuffix;
-    }
-  }
-
-  private static <K, V> void writeHadoopFile(JavaPairRDD<K, V> rdd, Configuration conf,
-      ShardTemplateInformation shardTemplateInfo, Class<?> keyClass, Class<?> valueClass,
-      Class<? extends FileOutputFormat> formatClass) {
-    int numShards = shardTemplateInfo.getNumShards();
-    String shardTemplate = shardTemplateInfo.getShardTemplate();
-    String filenamePrefix = shardTemplateInfo.getFilenamePrefix();
-    String filenameSuffix = shardTemplateInfo.getFilenameSuffix();
-    if (numShards != 0) {
-      // number of shards was set explicitly, so repartition
-      rdd = rdd.repartition(numShards);
-    }
-    int actualNumShards = rdd.partitions().size();
-    String template = replaceShardCount(shardTemplate, actualNumShards);
-    String outputDir = getOutputDirectory(filenamePrefix, template);
-    String filePrefix = getOutputFilePrefix(filenamePrefix, template);
-    String fileTemplate = getOutputFileTemplate(filenamePrefix, template);
-
-    conf.set(ShardNameTemplateHelper.OUTPUT_FILE_PREFIX, filePrefix);
-    conf.set(ShardNameTemplateHelper.OUTPUT_FILE_TEMPLATE, fileTemplate);
-    conf.set(ShardNameTemplateHelper.OUTPUT_FILE_SUFFIX, filenameSuffix);
-    rdd.saveAsNewAPIHadoopFile(outputDir, keyClass, valueClass, formatClass, conf);
-  }
-
-  private static final FieldGetter WINDOW_FG = new FieldGetter(Window.Bound.class);
-
-  private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
-    return new TransformEvaluator<Window.Bound<T>>() {
-      @Override
-      public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
-        @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<T>, ?> inRDD =
-            (JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform);
-        WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
-        if (windowFn instanceof GlobalWindows) {
-          context.setOutputRDD(transform, inRDD);
-        } else {
-          @SuppressWarnings("unchecked")
-          DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
-          DoFnFunction<T, T> dofn =
-                  new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null);
-          context.setOutputRDD(transform, inRDD.mapPartitions(dofn));
-        }
-      }
-    };
-  }
-
-  private static <T> TransformEvaluator<Create.Values<T>> create() {
-    return new TransformEvaluator<Create.Values<T>>() {
-      @Override
-      public void evaluate(Create.Values<T> transform, EvaluationContext context) {
-        Iterable<T> elems = transform.getElements();
-        // Use a coder to convert the objects in the PCollection to byte arrays, so they
-        // can be transferred over the network.
-        Coder<T> coder = context.getOutput(transform).getCoder();
-        context.setOutputRDDFromValues(transform, elems, coder);
-      }
-    };
-  }
-
-  private static <T> TransformEvaluator<View.AsSingleton<T>> viewAsSingleton() {
-    return new TransformEvaluator<View.AsSingleton<T>>() {
-      @Override
-      public void evaluate(View.AsSingleton<T> transform, EvaluationContext context) {
-        Iterable<? extends WindowedValue<?>> iter =
-                context.getWindowedValues(context.getInput(transform));
-        context.setPView(context.getOutput(transform), iter);
-      }
-    };
-  }
-
-  private static <T> TransformEvaluator<View.AsIterable<T>> viewAsIter() {
-    return new TransformEvaluator<View.AsIterable<T>>() {
-      @Override
-      public void evaluate(View.AsIterable<T> transform, EvaluationContext context) {
-        Iterable<? extends WindowedValue<?>> iter =
-                context.getWindowedValues(context.getInput(transform));
-        context.setPView(context.getOutput(transform), iter);
-      }
-    };
-  }
-
-  private static <R, W> TransformEvaluator<View.CreatePCollectionView<R, W>> createPCollView() {
-    return new TransformEvaluator<View.CreatePCollectionView<R, W>>() {
-      @Override
-      public void evaluate(View.CreatePCollectionView<R, W> transform, EvaluationContext context) {
-        Iterable<? extends WindowedValue<?>> iter =
-            context.getWindowedValues(context.getInput(transform));
-        context.setPView(context.getOutput(transform), iter);
-      }
-    };
-  }
-
-  private static final class TupleTagFilter<V>
-      implements Function<Tuple2<TupleTag<V>, WindowedValue<?>>, Boolean> {
-
-    private final TupleTag<V> tag;
-
-    private TupleTagFilter(TupleTag<V> tag) {
-      this.tag = tag;
-    }
-
-    @Override
-    public Boolean call(Tuple2<TupleTag<V>, WindowedValue<?>> input) {
-      return tag.equals(input._1());
-    }
-  }
-
-  private static Map<TupleTag<?>, BroadcastHelper<?>> getSideInputs(
-      List<PCollectionView<?>> views,
-      EvaluationContext context) {
-    if (views == null) {
-      return ImmutableMap.of();
-    } else {
-      Map<TupleTag<?>, BroadcastHelper<?>> sideInputs = Maps.newHashMap();
-      for (PCollectionView<?> view : views) {
-        Iterable<? extends WindowedValue<?>> collectionView = context.getPCollectionView(view);
-        Coder<Iterable<WindowedValue<?>>> coderInternal = view.getCoderInternal();
-        @SuppressWarnings("unchecked")
-        BroadcastHelper<?> helper =
-            BroadcastHelper.create((Iterable<WindowedValue<?>>) collectionView, coderInternal);
-        //broadcast side inputs
-        helper.broadcast(context.getSparkContext());
-        sideInputs.put(view.getTagInternal(), helper);
-      }
-      return sideInputs;
-    }
-  }
-
-  private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps
-      .newHashMap();
-
-  static {
-    EVALUATORS.put(TextIO.Read.Bound.class, readText());
-    EVALUATORS.put(TextIO.Write.Bound.class, writeText());
-    EVALUATORS.put(AvroIO.Read.Bound.class, readAvro());
-    EVALUATORS.put(AvroIO.Write.Bound.class, writeAvro());
-    EVALUATORS.put(HadoopIO.Read.Bound.class, readHadoop());
-    EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop());
-    EVALUATORS.put(ParDo.Bound.class, parDo());
-    EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
-    EVALUATORS.put(GroupByKey.GroupByKeyOnly.class, gbk());
-    EVALUATORS.put(Combine.GroupedValues.class, grouped());
-    EVALUATORS.put(Combine.Globally.class, combineGlobally());
-    EVALUATORS.put(Combine.PerKey.class, combinePerKey());
-    EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
-    EVALUATORS.put(Create.Values.class, create());
-    EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
-    EVALUATORS.put(View.AsIterable.class, viewAsIter());
-    EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());
-    EVALUATORS.put(Window.Bound.class, window());
-  }
-
-  public static <PT extends PTransform<?, ?>> TransformEvaluator<PT>
-  getTransformEvaluator(Class<PT> clazz) {
-    @SuppressWarnings("unchecked")
-    TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz);
-    if (transform == null) {
-      throw new IllegalStateException("No TransformEvaluator registered for " + clazz);
-    }
-    return transform;
-  }
-
-  /**
-   * Translator matches Dataflow transformation with the appropriate evaluator.
-   */
-  public static class Translator implements SparkPipelineTranslator {
-
-    @Override
-    public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
-      return EVALUATORS.containsKey(clazz);
-    }
-
-    @Override
-    public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) {
-      return getTransformEvaluator(clazz);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/WindowingHelpers.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/WindowingHelpers.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/WindowingHelpers.java
deleted file mode 100644
index 90600b2..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/WindowingHelpers.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import org.apache.spark.api.java.function.Function;
-
-/**
- * Helper functions for working with windows.
- */
-public final class WindowingHelpers {
-  private WindowingHelpers() {
-  }
-
-  /**
-   * A function for converting a value to a {@link WindowedValue}. The resulting
-   * {@link WindowedValue} will be in no windows, and will have the default timestamp
-   * and pane.
-   *
-   * @param <T>   The type of the object.
-   * @return A function that accepts an object and returns its {@link WindowedValue}.
-   */
-  public static <T> Function<T, WindowedValue<T>> windowFunction() {
-    return new Function<T, WindowedValue<T>>() {
-      @Override
-      public WindowedValue<T> call(T t) {
-        return WindowedValue.valueInEmptyWindows(t);
-      }
-    };
-  }
-
-  /**
-   * A function for extracting the value from a {@link WindowedValue}.
-   *
-   * @param <T>   The type of the object.
-   * @return A function that accepts a {@link WindowedValue} and returns its value.
-   */
-  public static <T> Function<WindowedValue<T>, T> unwindowFunction() {
-    return new Function<WindowedValue<T>, T>() {
-      @Override
-      public T call(WindowedValue<T> t) {
-        return t.getValue();
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/aggregators/AggAccumParam.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/aggregators/AggAccumParam.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/aggregators/AggAccumParam.java
deleted file mode 100644
index a3055a2..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/aggregators/AggAccumParam.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark.aggregators;
-
-import org.apache.spark.AccumulatorParam;
-
-public class AggAccumParam implements AccumulatorParam<NamedAggregators> {
-  @Override
-  public NamedAggregators addAccumulator(NamedAggregators current, NamedAggregators added) {
-    return current.merge(added);
-  }
-
-  @Override
-  public NamedAggregators addInPlace(NamedAggregators current, NamedAggregators added) {
-    return addAccumulator(current, added);
-  }
-
-  @Override
-  public NamedAggregators zero(NamedAggregators initialValue) {
-    return new NamedAggregators();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/aggregators/NamedAggregators.java
deleted file mode 100644
index d51e404..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/aggregators/NamedAggregators.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark.aggregators;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.Map;
-import java.util.TreeMap;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.common.collect.ImmutableList;
-
-import com.cloudera.dataflow.spark.SparkRuntimeContext;
-
-/**
- * This class wraps a map of named aggregators. Spark expects that all accumulators be declared
- * before a job is launched. Dataflow allows aggregators to be used and incremented on the fly.
- * We create a map of named aggregators and instantiate in the the spark context before the job
- * is launched. We can then add aggregators on the fly in Spark.
- */
-public class NamedAggregators implements Serializable {
-  /**
-   * Map from aggregator name to current state.
-   */
-  private final Map<String, State<?, ?, ?>> mNamedAggregators = new TreeMap<>();
-
-  /**
-   * Constructs a new NamedAggregators instance.
-   */
-  public NamedAggregators() {
-  }
-
-  /**
-   * Constructs a new named aggregators instance that contains a mapping from the specified
-   * `named` to the associated initial state.
-   *
-   * @param name  Name of aggregator.
-   * @param state Associated State.
-   */
-  public NamedAggregators(String name, State<?, ?, ?> state) {
-    this.mNamedAggregators.put(name, state);
-  }
-
-  /**
-   * @param name      Name of aggregator to retrieve.
-   * @param typeClass Type class to cast the value to.
-   * @param <T>       Type to be returned.
-   * @return the value of the aggregator associated with the specified name
-   */
-  public <T> T getValue(String name, Class<T> typeClass) {
-    return typeClass.cast(mNamedAggregators.get(name).render());
-  }
-
-  /**
-   * Merges another NamedAggregators instance with this instance.
-   *
-   * @param other The other instance of named aggregators ot merge.
-   * @return This instance of Named aggregators with associated states updated to reflect the
-   * other instance's aggregators.
-   */
-  public NamedAggregators merge(NamedAggregators other) {
-    for (Map.Entry<String, State<?, ?, ?>> e : other.mNamedAggregators.entrySet()) {
-      String key = e.getKey();
-      State<?, ?, ?> otherValue = e.getValue();
-      State<?, ?, ?> value = mNamedAggregators.get(key);
-      if (value == null) {
-        mNamedAggregators.put(key, otherValue);
-      } else {
-        mNamedAggregators.put(key, merge(value, otherValue));
-      }
-    }
-    return this;
-  }
-
-  /**
-   * Helper method to merge States whose generic types aren't provably the same,
-   * so require some casting.
-   */
-  @SuppressWarnings("unchecked")
-  private static <A, B, C> State<A, B, C> merge(State<?, ?, ?> s1, State<?, ?, ?> s2) {
-    return ((State<A, B, C>) s1).merge((State<A, B, C>) s2);
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    for (Map.Entry<String, State<?, ?, ?>> e : mNamedAggregators.entrySet()) {
-      sb.append(e.getKey()).append(": ").append(e.getValue().render());
-    }
-    return sb.toString();
-  }
-
-  /**
-   * @param <IN>    Input data type
-   * @param <INTER> Intermediate data type (useful for averages)
-   * @param <OUT>   Output data type
-   */
-  public interface State<IN, INTER, OUT> extends Serializable {
-    /**
-     * @param element new element to update state
-     */
-    void update(IN element);
-
-    State<IN, INTER, OUT> merge(State<IN, INTER, OUT> other);
-
-    INTER current();
-
-    OUT render();
-
-    Combine.CombineFn<IN, INTER, OUT> getCombineFn();
-  }
-
-  /**
-   * =&gt; combineFunction in data flow.
-   */
-  public static class CombineFunctionState<IN, INTER, OUT> implements State<IN, INTER, OUT> {
-
-    private Combine.CombineFn<IN, INTER, OUT> combineFn;
-    private Coder<IN> inCoder;
-    private SparkRuntimeContext ctxt;
-    private transient INTER state;
-
-    public CombineFunctionState(
-        Combine.CombineFn<IN, INTER, OUT> combineFn,
-        Coder<IN> inCoder,
-        SparkRuntimeContext ctxt) {
-      this.combineFn = combineFn;
-      this.inCoder = inCoder;
-      this.ctxt = ctxt;
-      this.state = combineFn.createAccumulator();
-    }
-
-    @Override
-    public void update(IN element) {
-      combineFn.addInput(state, element);
-    }
-
-    @Override
-    public State<IN, INTER, OUT> merge(State<IN, INTER, OUT> other) {
-      this.state = combineFn.mergeAccumulators(ImmutableList.of(current(), other.current()));
-      return this;
-    }
-
-    @Override
-    public INTER current() {
-      return state;
-    }
-
-    @Override
-    public OUT render() {
-      return combineFn.extractOutput(state);
-    }
-
-    @Override
-    public Combine.CombineFn<IN, INTER, OUT> getCombineFn() {
-      return combineFn;
-    }
-
-    private void writeObject(ObjectOutputStream oos) throws IOException {
-      oos.writeObject(ctxt);
-      oos.writeObject(combineFn);
-      oos.writeObject(inCoder);
-      try {
-        combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
-            .encode(state, oos, Coder.Context.NESTED);
-      } catch (CannotProvideCoderException e) {
-        throw new IllegalStateException("Could not determine coder for accumulator", e);
-      }
-    }
-
-    @SuppressWarnings("unchecked")
-    private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
-      ctxt = (SparkRuntimeContext) ois.readObject();
-      combineFn = (Combine.CombineFn<IN, INTER, OUT>) ois.readObject();
-      inCoder = (Coder<IN>) ois.readObject();
-      try {
-        state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
-            .decode(ois, Coder.Context.NESTED);
-      } catch (CannotProvideCoderException e) {
-        throw new IllegalStateException("Could not determine coder for accumulator", e);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptions.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptions.java
deleted file mode 100644
index 57253f0..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptions.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.dataflow.spark.streaming;
-
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-
-import com.cloudera.dataflow.spark.SparkPipelineOptions;
-
-/**
- * Options used to configure Spark streaming.
- */
-public interface SparkStreamingPipelineOptions extends SparkPipelineOptions {
-  @Description("Timeout to wait (in msec) for the streaming execution so stop, -1 runs until " +
-          "execution is stopped")
-  @Default.Long(-1)
-  Long getTimeout();
-
-  void setTimeout(Long batchInterval);
-
-  @Override
-  @Default.Boolean(true)
-  boolean isStreaming();
-
-  @Override
-  @Default.String("spark streaming dataflow pipeline job")
-  String getAppName();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsFactory.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsFactory.java
deleted file mode 100644
index 3b568af..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.dataflow.spark.streaming;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-
-public final class SparkStreamingPipelineOptionsFactory {
-
-  private SparkStreamingPipelineOptionsFactory() {
-  }
-
-  public static SparkStreamingPipelineOptions create() {
-    return PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java
deleted file mode 100644
index 01c4375..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.dataflow.spark.streaming;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
-import com.google.common.collect.ImmutableList;
-
-public class SparkStreamingPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
-
-  @Override
-  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-    return ImmutableList.<Class<? extends PipelineOptions>>of(SparkStreamingPipelineOptions
-            .class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
deleted file mode 100644
index 5ecd562..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.dataflow.spark.streaming;
-
-
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaRDDLike;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaDStreamLike;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-
-import com.cloudera.dataflow.spark.EvaluationContext;
-import com.cloudera.dataflow.spark.SparkRuntimeContext;
-
-/**
- * Streaming evaluation context helps to handle streaming.
- */
-public class StreamingEvaluationContext extends EvaluationContext {
-
-  private final JavaStreamingContext jssc;
-  private final long timeout;
-  private final Map<PValue, DStreamHolder<?>> pstreams = new LinkedHashMap<>();
-  private final Set<DStreamHolder<?>> leafStreams = new LinkedHashSet<>();
-
-  public StreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
-      JavaStreamingContext jssc, long timeout) {
-    super(jsc, pipeline);
-    this.jssc = jssc;
-    this.timeout = timeout;
-  }
-
-  /**
-   * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for
-   * testing.
-   */
-  private class DStreamHolder<T> {
-
-    private Iterable<Iterable<T>> values;
-    private Coder<T> coder;
-    private JavaDStream<WindowedValue<T>> dStream;
-
-    DStreamHolder(Iterable<Iterable<T>> values, Coder<T> coder) {
-      this.values = values;
-      this.coder = coder;
-    }
-
-    DStreamHolder(JavaDStream<WindowedValue<T>> dStream) {
-      this.dStream = dStream;
-    }
-
-    @SuppressWarnings("unchecked")
-    JavaDStream<WindowedValue<T>> getDStream() {
-      if (dStream == null) {
-        // create the DStream from values
-        Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
-        for (Iterable<T> v : values) {
-          setOutputRDDFromValues(currentTransform.getTransform(), v, coder);
-          rddQueue.offer((JavaRDD<WindowedValue<T>>) getOutputRDD(currentTransform.getTransform()));
-        }
-        // create dstream from queue, one at a time, no defaults
-        // mainly for unit test so no reason to have this configurable
-        dStream = jssc.queueStream(rddQueue, true);
-      }
-      return dStream;
-    }
-  }
-
-  <T> void setDStreamFromQueue(
-      PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) {
-    pstreams.put((PValue) getOutput(transform), new DStreamHolder<>(values, coder));
-  }
-
-  <T> void setStream(PTransform<?, ?> transform, JavaDStream<WindowedValue<T>> dStream) {
-    PValue pvalue = (PValue) getOutput(transform);
-    DStreamHolder<T> dStreamHolder = new DStreamHolder<>(dStream);
-    pstreams.put(pvalue, dStreamHolder);
-    leafStreams.add(dStreamHolder);
-  }
-
-  boolean hasStream(PTransform<?, ?> transform) {
-    PValue pvalue = (PValue) getInput(transform);
-    return pstreams.containsKey(pvalue);
-  }
-
-  JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) {
-    return getStream((PValue) getInput(transform));
-  }
-
-  JavaDStreamLike<?, ?, ?> getStream(PValue pvalue) {
-    DStreamHolder<?> dStreamHolder = pstreams.get(pvalue);
-    JavaDStreamLike<?, ?, ?> dStream = dStreamHolder.getDStream();
-    leafStreams.remove(dStreamHolder);
-    return dStream;
-  }
-
-  // used to set the RDD from the DStream in the RDDHolder for transformation
-  <T> void setInputRDD(
-      PTransform<? extends PInput, ?> transform, JavaRDDLike<WindowedValue<T>, ?> rdd) {
-    setRDD((PValue) getInput(transform), rdd);
-  }
-
-  // used to get the RDD transformation output and use it as the DStream transformation output
-  JavaRDDLike<?, ?> getOutputRDD(PTransform<?, ?> transform) {
-    return getRDD((PValue) getOutput(transform));
-  }
-
-  public JavaStreamingContext getStreamingContext() {
-    return jssc;
-  }
-
-  @Override
-  protected void computeOutputs() {
-    for (DStreamHolder<?> streamHolder : leafStreams) {
-      computeOutput(streamHolder);
-    }
-  }
-
-  private static <T> void computeOutput(DStreamHolder<T> streamHolder) {
-    streamHolder.getDStream().foreachRDD(new Function<JavaRDD<WindowedValue<T>>, Void>() {
-      @Override
-      public Void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
-        rdd.rdd().cache();
-        rdd.count();
-        return null;
-      }
-    }); // force a DStream action
-  }
-
-  @Override
-  public void close() {
-    if (timeout > 0) {
-      jssc.awaitTerminationOrTimeout(timeout);
-    } else {
-      jssc.awaitTermination();
-    }
-    //TODO: stop gracefully ?
-    jssc.stop(false, false);
-    state = State.DONE;
-    super.close();
-  }
-
-  private State state = State.RUNNING;
-
-  @Override
-  public State getState() {
-    return state;
-  }
-
-  //---------------- override in order to expose in package
-  @Override
-  protected <I extends PInput> I getInput(PTransform<I, ?> transform) {
-    return super.getInput(transform);
-  }
-  @Override
-  protected <O extends POutput> O getOutput(PTransform<?, O> transform) {
-    return super.getOutput(transform);
-  }
-
-  @Override
-  protected JavaSparkContext getSparkContext() {
-    return super.getSparkContext();
-  }
-
-  @Override
-  protected SparkRuntimeContext getRuntimeContext() {
-    return super.getRuntimeContext();
-  }
-
-  @Override
-  protected void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
-    super.setCurrentTransform(transform);
-  }
-
-  @Override
-  protected AppliedPTransform<?, ?, ?> getCurrentTransform() {
-    return super.getCurrentTransform();
-  }
-
-  @Override
-  protected <T> void setOutputRDD(PTransform<?, ?> transform,
-      JavaRDDLike<WindowedValue<T>, ?> rdd) {
-    super.setOutputRDD(transform, rdd);
-  }
-
-  @Override
-  protected <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values,
-      Coder<T> coder) {
-    super.setOutputRDDFromValues(transform, values, coder);
-  }
-
-  @Override
-  protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) {
-    return super.hasOutputRDD(transform);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
deleted file mode 100644
index d8ae5e8..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.dataflow.spark.streaming;
-
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.api.client.util.Lists;
-import com.google.api.client.util.Maps;
-import com.google.api.client.util.Sets;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.io.AvroIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.repackaged.com.google.common.reflect.TypeToken;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.values.PDone;
-
-import kafka.serializer.Decoder;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaDStreamLike;
-import org.apache.spark.streaming.api.java.JavaPairInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-
-import scala.Tuple2;
-
-import com.cloudera.dataflow.hadoop.HadoopIO;
-import com.cloudera.dataflow.io.ConsoleIO;
-import com.cloudera.dataflow.io.CreateStream;
-import com.cloudera.dataflow.io.KafkaIO;
-import com.cloudera.dataflow.spark.DoFnFunction;
-import com.cloudera.dataflow.spark.EvaluationContext;
-import com.cloudera.dataflow.spark.SparkPipelineTranslator;
-import com.cloudera.dataflow.spark.TransformEvaluator;
-import com.cloudera.dataflow.spark.TransformTranslator;
-import com.cloudera.dataflow.spark.WindowingHelpers;
-
-/**
- * Supports translation between a DataFlow transform, and Spark's operations on DStreams.
- */
-public final class StreamingTransformTranslator {
-
-  private StreamingTransformTranslator() {
-  }
-
-  private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() {
-    return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>() {
-      @Override
-      public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) {
-        @SuppressWarnings("unchecked")
-        JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
-            (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
-            ((StreamingEvaluationContext) context).getStream(transform);
-        dstream.map(WindowingHelpers.<T>unwindowFunction()).print(transform.getNum());
-      }
-    };
-  }
-
-  private static <K, V> TransformEvaluator<KafkaIO.Read.Unbound<K, V>> kafka() {
-    return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>() {
-      @Override
-      public void evaluate(KafkaIO.Read.Unbound<K, V> transform, EvaluationContext context) {
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        JavaStreamingContext jssc = sec.getStreamingContext();
-        Class<K> keyClazz = transform.getKeyClass();
-        Class<V> valueClazz = transform.getValueClass();
-        Class<? extends Decoder<K>> keyDecoderClazz = transform.getKeyDecoderClass();
-        Class<? extends Decoder<V>> valueDecoderClazz = transform.getValueDecoderClass();
-        Map<String, String> kafkaParams = transform.getKafkaParams();
-        Set<String> topics = transform.getTopics();
-        JavaPairInputDStream<K, V> inputPairStream = KafkaUtils.createDirectStream(jssc, keyClazz,
-                valueClazz, keyDecoderClazz, valueDecoderClazz, kafkaParams, topics);
-        JavaDStream<WindowedValue<KV<K, V>>> inputStream =
-            inputPairStream.map(new Function<Tuple2<K, V>, KV<K, V>>() {
-          @Override
-          public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
-            return KV.of(t2._1(), t2._2());
-          }
-        }).map(WindowingHelpers.<KV<K, V>>windowFunction());
-        sec.setStream(transform, inputStream);
-      }
-    };
-  }
-
-  private static <T> TransformEvaluator<Create.Values<T>> create() {
-    return new TransformEvaluator<Create.Values<T>>() {
-      @SuppressWarnings("unchecked")
-      @Override
-      public void evaluate(Create.Values<T> transform, EvaluationContext context) {
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        Iterable<T> elems = transform.getElements();
-        Coder<T> coder = sec.getOutput(transform).getCoder();
-        if (coder != VoidCoder.of()) {
-          // actual create
-          sec.setOutputRDDFromValues(transform, elems, coder);
-        } else {
-          // fake create as an input
-          // creates a stream with a single batch containing a single null element
-          // to invoke following transformations once
-          // to support DataflowAssert
-          sec.setDStreamFromQueue(transform,
-              Collections.<Iterable<Void>>singletonList(Collections.singletonList((Void) null)),
-              (Coder<Void>) coder);
-        }
-      }
-    };
-  }
-
-  private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() {
-    return new TransformEvaluator<CreateStream.QueuedValues<T>>() {
-      @Override
-      public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext context) {
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        Iterable<Iterable<T>> values = transform.getQueuedValues();
-        Coder<T> coder = sec.getOutput(transform).getCoder();
-        sec.setDStreamFromQueue(transform, values, coder);
-      }
-    };
-  }
-
-  private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
-    return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
-      @SuppressWarnings("unchecked")
-      @Override
-      public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        PCollectionList<T> pcs = sec.getInput(transform);
-        JavaDStream<WindowedValue<T>> first =
-            (JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(0));
-        List<JavaDStream<WindowedValue<T>>> rest = Lists.newArrayListWithCapacity(pcs.size() - 1);
-        for (int i = 1; i < pcs.size(); i++) {
-          rest.add((JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(i)));
-        }
-        JavaDStream<WindowedValue<T>> dstream = sec.getStreamingContext().union(first, rest);
-        sec.setStream(transform, dstream);
-      }
-    };
-  }
-
-  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> rddTransform(
-      final SparkPipelineTranslator rddTranslator) {
-    return new TransformEvaluator<PT>() {
-      @SuppressWarnings("unchecked")
-      @Override
-      public void evaluate(PT transform, EvaluationContext context) {
-        TransformEvaluator<PT> rddEvaluator =
-            rddTranslator.translate((Class<PT>) transform.getClass());
-
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        if (sec.hasStream(transform)) {
-          JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
-              (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
-              sec.getStream(transform);
-
-          sec.setStream(transform, dStream
-              .transform(new RDDTransform<>(sec, rddEvaluator, transform)));
-        } else {
-          // if the transformation requires direct access to RDD (not in stream)
-          // this is used for "fake" transformations like with DataflowAssert
-          rddEvaluator.evaluate(transform, context);
-        }
-      }
-    };
-  }
-
-  /**
-   * RDD transform function If the transformation function doesn't have an input, create a fake one
-   * as an empty RDD.
-   *
-   * @param <PT> PTransform type
-   */
-  private static final class RDDTransform<PT extends PTransform<?, ?>>
-      implements Function<JavaRDD<WindowedValue<Object>>, JavaRDD<WindowedValue<Object>>> {
-
-    private final StreamingEvaluationContext context;
-    private final AppliedPTransform<?, ?, ?> appliedPTransform;
-    private final TransformEvaluator<PT> rddEvaluator;
-    private final PT transform;
-
-
-    private RDDTransform(StreamingEvaluationContext context, TransformEvaluator<PT> rddEvaluator,
-        PT transform) {
-      this.context = context;
-      this.appliedPTransform = context.getCurrentTransform();
-      this.rddEvaluator = rddEvaluator;
-      this.transform = transform;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public JavaRDD<WindowedValue<Object>>
-        call(JavaRDD<WindowedValue<Object>> rdd) throws Exception {
-      AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform();
-      context.setCurrentTransform(appliedPTransform);
-      context.setInputRDD(transform, rdd);
-      rddEvaluator.evaluate(transform, context);
-      if (!context.hasOutputRDD(transform)) {
-        // fake RDD as output
-        context.setOutputRDD(transform,
-            context.getSparkContext().<WindowedValue<Object>>emptyRDD());
-      }
-      JavaRDD<WindowedValue<Object>> outRDD =
-          (JavaRDD<WindowedValue<Object>>) context.getOutputRDD(transform);
-      context.setCurrentTransform(existingAPT);
-      return outRDD;
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> foreachRDD(
-      final SparkPipelineTranslator rddTranslator) {
-    return new TransformEvaluator<PT>() {
-      @Override
-      public void evaluate(PT transform, EvaluationContext context) {
-        TransformEvaluator<PT> rddEvaluator =
-            rddTranslator.translate((Class<PT>) transform.getClass());
-
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        if (sec.hasStream(transform)) {
-          JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
-              (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
-              sec.getStream(transform);
-
-          dStream.foreachRDD(new RDDOutputOperator<>(sec, rddEvaluator, transform));
-        } else {
-          rddEvaluator.evaluate(transform, context);
-        }
-      }
-    };
-  }
-
-  /**
-   * RDD output function.
-   *
-   * @param <PT> PTransform type
-   */
-  private static final class RDDOutputOperator<PT extends PTransform<?, ?>>
-      implements Function<JavaRDD<WindowedValue<Object>>, Void> {
-
-    private final StreamingEvaluationContext context;
-    private final AppliedPTransform<?, ?, ?> appliedPTransform;
-    private final TransformEvaluator<PT> rddEvaluator;
-    private final PT transform;
-
-
-    private RDDOutputOperator(StreamingEvaluationContext context,
-        TransformEvaluator<PT> rddEvaluator, PT transform) {
-      this.context = context;
-      this.appliedPTransform = context.getCurrentTransform();
-      this.rddEvaluator = rddEvaluator;
-      this.transform = transform;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public Void call(JavaRDD<WindowedValue<Object>> rdd) throws Exception {
-      AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform();
-      context.setCurrentTransform(appliedPTransform);
-      context.setInputRDD(transform, rdd);
-      rddEvaluator.evaluate(transform, context);
-      context.setCurrentTransform(existingAPT);
-      return null;
-    }
-  }
-
-  private static final TransformTranslator.FieldGetter WINDOW_FG =
-      new TransformTranslator.FieldGetter(Window.Bound.class);
-
-  private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
-    return new TransformEvaluator<Window.Bound<T>>() {
-      @Override
-      public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        //--- first we apply windowing to the stream
-        WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
-        @SuppressWarnings("unchecked")
-        JavaDStream<WindowedValue<T>> dStream =
-            (JavaDStream<WindowedValue<T>>) sec.getStream(transform);
-        if (windowFn instanceof FixedWindows) {
-          Duration windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize()
-              .getMillis());
-          sec.setStream(transform, dStream.window(windowDuration));
-        } else if (windowFn instanceof SlidingWindows) {
-          Duration windowDuration = Durations.milliseconds(((SlidingWindows) windowFn).getSize()
-              .getMillis());
-          Duration slideDuration = Durations.milliseconds(((SlidingWindows) windowFn).getPeriod()
-              .getMillis());
-          sec.setStream(transform, dStream.window(windowDuration, slideDuration));
-        }
-        //--- then we apply windowing to the elements
-        DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
-        DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn,
-            ((StreamingEvaluationContext)context).getRuntimeContext(), null);
-        @SuppressWarnings("unchecked")
-        JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
-            (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
-            sec.getStream(transform);
-        sec.setStream(transform, dstream.mapPartitions(dofn));
-      }
-    };
-  }
-
-  private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps
-      .newHashMap();
-
-  static {
-    EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
-    EVALUATORS.put(CreateStream.QueuedValues.class, createFromQueue());
-    EVALUATORS.put(Create.Values.class, create());
-    EVALUATORS.put(KafkaIO.Read.Unbound.class, kafka());
-    EVALUATORS.put(Window.Bound.class, window());
-    EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
-  }
-
-  private static final Set<Class<? extends PTransform>> UNSUPPORTED_EVALUATORS = Sets
-      .newHashSet();
-
-  static {
-    //TODO - add support for the following
-    UNSUPPORTED_EVALUATORS.add(TextIO.Read.Bound.class);
-    UNSUPPORTED_EVALUATORS.add(TextIO.Write.Bound.class);
-    UNSUPPORTED_EVALUATORS.add(AvroIO.Read.Bound.class);
-    UNSUPPORTED_EVALUATORS.add(AvroIO.Write.Bound.class);
-    UNSUPPORTED_EVALUATORS.add(HadoopIO.Read.Bound.class);
-    UNSUPPORTED_EVALUATORS.add(HadoopIO.Write.Bound.class);
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT>
-      getTransformEvaluator(Class<PT> clazz, SparkPipelineTranslator rddTranslator) {
-    TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz);
-    if (transform == null) {
-      if (UNSUPPORTED_EVALUATORS.contains(clazz)) {
-        throw new UnsupportedOperationException("Dataflow transformation " + clazz
-          .getCanonicalName()
-          + " is currently unsupported by the Spark streaming pipeline");
-      }
-      // DStream transformations will transform an RDD into another RDD
-      // Actions will create output
-      // In Dataflow it depends on the PTransform's Input and Output class
-      Class<?> pTOutputClazz = getPTransformOutputClazz(clazz);
-      if (PDone.class.equals(pTOutputClazz)) {
-        return foreachRDD(rddTranslator);
-      } else {
-        return rddTransform(rddTranslator);
-      }
-    }
-    return transform;
-  }
-
-  private static <PT extends PTransform<?, ?>> Class<?> getPTransformOutputClazz(Class<PT> clazz) {
-    Type[] types = ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments();
-    return TypeToken.of(clazz).resolveType(types[1]).getRawType();
-  }
-
-  /**
-   * Translator matches Dataflow transformation with the appropriate Spark streaming evaluator.
-   * rddTranslator uses Spark evaluators in transform/foreachRDD to evaluate the transformation
-   */
-  public static class Translator implements SparkPipelineTranslator {
-
-    private final SparkPipelineTranslator rddTranslator;
-
-    public Translator(SparkPipelineTranslator rddTranslator) {
-      this.rddTranslator = rddTranslator;
-    }
-
-    @Override
-    public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
-      // streaming includes rdd transformations as well
-      return EVALUATORS.containsKey(clazz) || rddTranslator.hasTranslation(clazz);
-    }
-
-    @Override
-    public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) {
-      return getTransformEvaluator(clazz, rddTranslator);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java
deleted file mode 100644
index 406dfcc..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark.streaming;
-
-import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.Durations;
-
-import com.cloudera.dataflow.spark.SparkPipelineRunner;
-import com.cloudera.dataflow.spark.SparkPipelineTranslator;
-import com.cloudera.dataflow.spark.TransformTranslator;
-
-/**
- * Pipeline {@link SparkPipelineRunner.Evaluator} to detect windowing.
- */
-public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.Evaluator {
-
-  // Currently, Spark streaming recommends batches no smaller then 500 msec
-  private static final Duration SPARK_MIN_WINDOW = Durations.milliseconds(500);
-
-  private boolean windowing;
-  private Duration batchDuration;
-
-  public StreamingWindowPipelineDetector(SparkPipelineTranslator translator) {
-    super(translator);
-  }
-
-  private static final TransformTranslator.FieldGetter WINDOW_FG =
-      new TransformTranslator.FieldGetter(Window.Bound.class);
-
-  // Use the smallest window (fixed or sliding) as Spark streaming's batch duration
-  @Override
-  protected <PT extends PTransform<? super PInput, POutput>> void
-      doVisitTransform(TransformTreeNode node) {
-    @SuppressWarnings("unchecked")
-    PT transform = (PT) node.getTransform();
-    @SuppressWarnings("unchecked")
-    Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
-    if (transformClass.isAssignableFrom(Window.Bound.class)) {
-      WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform);
-      if (windowFn instanceof FixedWindows) {
-        setBatchDuration(((FixedWindows) windowFn).getSize());
-      } else if (windowFn instanceof SlidingWindows) {
-        if (((SlidingWindows) windowFn).getOffset().getMillis() > 0) {
-          throw new UnsupportedOperationException("Spark does not support window offsets");
-        }
-        // Sliding window size might as well set the batch duration. Applying the transformation
-        // will add the "slide"
-        setBatchDuration(((SlidingWindows) windowFn).getSize());
-      } else if (!(windowFn instanceof GlobalWindows)) {
-        throw new IllegalStateException("Windowing function not supported: " + windowFn);
-      }
-    }
-  }
-
-  private void setBatchDuration(org.joda.time.Duration duration) {
-    Long durationMillis = duration.getMillis();
-    // validate window size
-    if (durationMillis < SPARK_MIN_WINDOW.milliseconds()) {
-      throw new IllegalArgumentException("Windowing of size " + durationMillis +
-          "msec is not supported!");
-    }
-    // choose the smallest duration to be Spark's batch duration, larger ones will be handled
-    // as window functions  over the batched-stream
-    if (!windowing || this.batchDuration.milliseconds() > durationMillis) {
-      this.batchDuration = Durations.milliseconds(durationMillis);
-    }
-    windowing = true;
-  }
-
-  public boolean isWindowing() {
-    return windowing;
-  }
-
-  public Duration getBatchDuration() {
-    return batchDuration;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/DoFnFunction.java
new file mode 100644
index 0000000..6af829f
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/DoFnFunction.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.spark.api.java.function.FlatMapFunction;
+
+/**
+ * Dataflow's Do functions correspond to Spark's FlatMap functions.
+ *
+ * @param <I> Input element type.
+ * @param <O> Output element type.
+ */
+public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValue<I>>,
+    WindowedValue<O>> {
+  private final DoFn<I, O> mFunction;
+  private final SparkRuntimeContext mRuntimeContext;
+  private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
+
+  /**
+   * @param fn         DoFunction to be wrapped.
+   * @param runtime    Runtime to apply function in.
+   * @param sideInputs Side inputs used in DoFunction.
+   */
+  public DoFnFunction(DoFn<I, O> fn,
+               SparkRuntimeContext runtime,
+               Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
+    this.mFunction = fn;
+    this.mRuntimeContext = runtime;
+    this.mSideInputs = sideInputs;
+  }
+
+  @Override
+  public Iterable<WindowedValue<O>> call(Iterator<WindowedValue<I>> iter) throws
+      Exception {
+    ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
+    ctxt.setup();
+    mFunction.startBundle(ctxt);
+    return ctxt.getOutputIterable(iter, mFunction);
+  }
+
+  private class ProcCtxt extends SparkProcessContext<I, O, WindowedValue<O>> {
+
+    private final List<WindowedValue<O>> outputs = new LinkedList<>();
+
+    ProcCtxt(DoFn<I, O> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
+        BroadcastHelper<?>> sideInputs) {
+      super(fn, runtimeContext, sideInputs);
+    }
+
+    @Override
+    public synchronized void output(O o) {
+      outputs.add(windowedValue != null ? windowedValue.withValue(o) :
+          WindowedValue.valueInEmptyWindows(o));
+    }
+
+    @Override
+    public synchronized void output(WindowedValue<O> o) {
+      outputs.add(o);
+    }
+
+    @Override
+    protected void clearOutput() {
+      outputs.clear();
+    }
+
+    @Override
+    protected Iterator<WindowedValue<O>> getOutputIterator() {
+      return outputs.iterator();
+    }
+  }
+
+}