You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/04/19 13:09:25 UTC
[16/18] beam git commit: [BEAM-1994] Remove Flink examples package
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
deleted file mode 100644
index 18343ef..0000000
--- a/runners/flink/runner/pom.xml
+++ /dev/null
@@ -1,330 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-flink-parent</artifactId>
- <version>0.7.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>beam-runners-flink_2.10</artifactId>
-
- <name>Apache Beam :: Runners :: Flink :: Core</name>
-
- <packaging>jar</packaging>
-
- <profiles>
- <profile>
- <id>local-validates-runner-tests</id>
- <activation><activeByDefault>false</activeByDefault></activation>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <executions>
-
- <!-- This configures the inherited validates-runner-tests
- execution to execute with a local Flink instance. -->
- <execution>
- <id>validates-runner-tests</id>
- <phase>integration-test</phase>
- <goals>
- <goal>test</goal>
- </goals>
- <configuration>
- <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
- <excludedGroups>
- org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
- org.apache.beam.sdk.testing.UsesSplittableParDo,
- org.apache.beam.sdk.testing.UsesAttemptedMetrics,
- org.apache.beam.sdk.testing.UsesCommittedMetrics,
- org.apache.beam.sdk.testing.UsesTestStream
- </excludedGroups>
- <parallel>none</parallel>
- <failIfNoTests>true</failIfNoTests>
- <dependenciesToScan>
- <dependency>org.apache.beam:beam-sdks-java-core</dependency>
- </dependenciesToScan>
- <systemPropertyVariables>
- <beamTestPipelineOptions>
- [
- "--runner=TestFlinkRunner",
- "--streaming=false"
- ]
- </beamTestPipelineOptions>
- </systemPropertyVariables>
- </configuration>
- </execution>
-
- <!-- This second execution runs the tests in streaming mode -->
- <execution>
- <id>streaming-validates-runner-tests</id>
- <phase>integration-test</phase>
- <goals>
- <goal>test</goal>
- </goals>
- <configuration>
- <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
- <excludedGroups>
- org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
- org.apache.beam.sdk.testing.UsesSetState,
- org.apache.beam.sdk.testing.UsesMapState,
- org.apache.beam.sdk.testing.UsesAttemptedMetrics,
- org.apache.beam.sdk.testing.UsesCommittedMetrics,
- org.apache.beam.sdk.testing.UsesTestStream,
- org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs
- </excludedGroups>
- <parallel>none</parallel>
- <failIfNoTests>true</failIfNoTests>
- <dependenciesToScan>
- <dependency>org.apache.beam:beam-sdks-java-core</dependency>
- </dependenciesToScan>
- <systemPropertyVariables>
- <beamTestPipelineOptions>
- [
- "--runner=TestFlinkRunner",
- "--streaming=true"
- ]
- </beamTestPipelineOptions>
- </systemPropertyVariables>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- </plugin>
-
- <!-- Integration Tests -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- </plugin>
-
- <!-- Unit Tests -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <!-- Flink dependencies -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.10</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_2.10</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <!-- For testing -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${flink.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_2.10</artifactId>
- <version>${flink.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <!-- Beam -->
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-core</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-core-java</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-core-construction-java</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </dependency>
-
- <!--
- Force an upgrade on the version of Apache Commons from Flink to support DEFLATE compression.
- -->
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- <scope>runtime</scope>
- </dependency>
-
- <!-- Test scoped -->
- <dependency>
- <groupId>com.google.apis</groupId>
- <artifactId>google-api-services-bigquery</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-all</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
-
- <!-- Depend on test jar to scan for ValidatesRunner tests -->
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-core</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_2.10</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <artifactId>apacheds-jdbm1</artifactId>
- <groupId>org.apache.directory.jdbm</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <!-- Optional Pipeline Registration -->
- <dependency>
- <groupId>com.google.auto.service</groupId>
- <artifactId>auto-service</artifactId>
- <optional>true</optional>
- </dependency>
-
- <!-- transitive test dependencies from beam-sdk-java-core -->
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-yaml</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-common-fn-api</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
deleted file mode 100644
index b745f0b..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-
-/**
- * {@link DefaultValueFactory} for getting a default value for the parallelism option
- * on {@link FlinkPipelineOptions}.
- *
- * <p>This will return either the default value from {@link GlobalConfiguration} or {@code 1}.
- * A valid {@link GlobalConfiguration} is only available if the program is executed by the Flink
- * run scripts.
- */
-public class DefaultParallelismFactory implements DefaultValueFactory<Integer> {
- @Override
- public Integer create(PipelineOptions options) {
- return GlobalConfiguration.loadConfiguration()
- .getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
deleted file mode 100644
index 854b674..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a
- * Flink batch job.
- */
-class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class);
-
- /**
- * The necessary context in the case of a batch job.
- */
- private final FlinkBatchTranslationContext batchContext;
-
- private int depth = 0;
-
- public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) {
- this.batchContext = new FlinkBatchTranslationContext(env, options);
- }
-
- @Override
- @SuppressWarnings("rawtypes, unchecked")
- public void translate(Pipeline pipeline) {
- super.translate(pipeline);
-
- // terminate dangling DataSets
- for (DataSet<?> dataSet: batchContext.getDanglingDataSets().values()) {
- dataSet.output(new DiscardingOutputFormat());
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Pipeline Visitor Methods
- // --------------------------------------------------------------------------------------------
-
- @Override
- public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
- LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
- this.depth++;
-
- BatchTransformTranslator<?> translator = getTranslator(node);
-
- if (translator != null) {
- applyBatchTransform(node.getTransform(), node, translator);
- LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName());
- return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
- } else {
- return CompositeBehavior.ENTER_TRANSFORM;
- }
- }
-
- @Override
- public void leaveCompositeTransform(TransformHierarchy.Node node) {
- this.depth--;
- LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
- }
-
- @Override
- public void visitPrimitiveTransform(TransformHierarchy.Node node) {
- LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName());
-
- // get the transformation corresponding to the node we are
- // currently visiting and translate it into its Flink alternative.
- PTransform<?, ?> transform = node.getTransform();
- BatchTransformTranslator<?> translator =
- FlinkBatchTransformTranslators.getTranslator(transform);
- if (translator == null) {
- LOG.info(node.getTransform().getClass().toString());
- throw new UnsupportedOperationException("The transform " + transform
- + " is currently not supported.");
- }
- applyBatchTransform(transform, node, translator);
- }
-
- private <T extends PTransform<?, ?>> void applyBatchTransform(
- PTransform<?, ?> transform,
- TransformHierarchy.Node node,
- BatchTransformTranslator<?> translator) {
-
- @SuppressWarnings("unchecked")
- T typedTransform = (T) transform;
-
- @SuppressWarnings("unchecked")
- BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
-
- // create the applied PTransform on the batchContext
- batchContext.setCurrentTransform(node.toAppliedPTransform());
- typedTranslator.translateNode(typedTransform, batchContext);
- }
-
- /**
- * A translator of a {@link PTransform}.
- */
- public interface BatchTransformTranslator<TransformT extends PTransform> {
- void translateNode(TransformT transform, FlinkBatchTranslationContext context);
- }
-
- /**
- * Returns a translator for the given node, if it is possible, otherwise null.
- */
- private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) {
- PTransform<?, ?> transform = node.getTransform();
-
- // Root of the graph is null
- if (transform == null) {
- return null;
- }
-
- return FlinkBatchTransformTranslators.getTranslator(transform);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
deleted file mode 100644
index ff9521c..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ /dev/null
@@ -1,723 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
-import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkMergingPartialReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkMergingReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkStatefulDoFnFunction;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.types.KvKeySelector;
-import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.transforms.join.UnionCoder;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.Reshuffle;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.operators.FlatMapOperator;
-import org.apache.flink.api.java.operators.GroupCombineOperator;
-import org.apache.flink.api.java.operators.GroupReduceOperator;
-import org.apache.flink.api.java.operators.Grouping;
-import org.apache.flink.api.java.operators.MapPartitionOperator;
-import org.apache.flink.api.java.operators.SingleInputUdfOperator;
-import org.apache.flink.util.Collector;
-
-/**
- * Translators for transforming {@link PTransform PTransforms} to
- * Flink {@link DataSet DataSets}.
- */
-class FlinkBatchTransformTranslators {
-
- // --------------------------------------------------------------------------------------------
- // Transform Translator Registry
- // --------------------------------------------------------------------------------------------
-
- @SuppressWarnings("rawtypes")
- private static final Map<
- Class<? extends PTransform>,
- FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>();
-
- static {
- TRANSLATORS.put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch());
-
- TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch());
- TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
- TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch());
-
- TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslatorBatch());
-
- TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslatorBatch());
-
- TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslatorBatch());
-
- TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch());
- }
-
-
- static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(
- PTransform<?, ?> transform) {
- return TRANSLATORS.get(transform.getClass());
- }
-
- private static class ReadSourceTranslatorBatch<T>
- implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> {
-
- @Override
- public void translateNode(Read.Bounded<T> transform, FlinkBatchTranslationContext context) {
- String name = transform.getName();
- BoundedSource<T> source = transform.getSource();
- PCollection<T> output = context.getOutput(transform);
-
- TypeInformation<WindowedValue<T>> typeInformation = context.getTypeInfo(output);
-
- DataSource<WindowedValue<T>> dataSource = new DataSource<>(
- context.getExecutionEnvironment(),
- new SourceInputFormat<>(source, context.getPipelineOptions()),
- typeInformation,
- name);
-
- context.setOutputDataSet(output, dataSource);
- }
- }
-
- private static class WindowAssignTranslatorBatch<T>
- implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Assign<T>> {
-
- @Override
- public void translateNode(Window.Assign<T> transform, FlinkBatchTranslationContext context) {
- PValue input = context.getInput(transform);
-
- TypeInformation<WindowedValue<T>> resultTypeInfo =
- context.getTypeInfo(context.getOutput(transform));
-
- DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input);
-
- @SuppressWarnings("unchecked")
- final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy =
- (WindowingStrategy<T, ? extends BoundedWindow>)
- context.getOutput(transform).getWindowingStrategy();
-
- WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
-
- FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
- new FlinkAssignWindows<>(windowFn);
-
- DataSet<WindowedValue<T>> resultDataSet = inputDataSet
- .flatMap(assignWindowsFunction)
- .name(context.getOutput(transform).getName())
- .returns(resultTypeInfo);
-
- context.setOutputDataSet(context.getOutput(transform), resultDataSet);
- }
- }
-
- private static class GroupByKeyTranslatorBatch<K, InputT>
- implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, InputT>> {
-
- @Override
- public void translateNode(
- GroupByKey<K, InputT> transform,
- FlinkBatchTranslationContext context) {
-
- // for now, this is copied from the Combine.PerKey translater. Once we have the new runner API
- // we can replace GroupByKey by a Combine.PerKey with the Concatenate CombineFn
-
- DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
- context.getInputDataSet(context.getInput(transform));
-
- Combine.KeyedCombineFn<K, InputT, List<InputT>, List<InputT>> combineFn =
- new Concatenate<InputT>().asKeyedFn();
-
- KvCoder<K, InputT> inputCoder =
- (KvCoder<K, InputT>) context.getInput(transform).getCoder();
-
- Coder<List<InputT>> accumulatorCoder;
-
- try {
- accumulatorCoder =
- combineFn.getAccumulatorCoder(
- context.getInput(transform).getPipeline().getCoderRegistry(),
- inputCoder.getKeyCoder(),
- inputCoder.getValueCoder());
- } catch (CannotProvideCoderException e) {
- throw new RuntimeException(e);
- }
-
- WindowingStrategy<?, ?> windowingStrategy =
- context.getInput(transform).getWindowingStrategy();
-
- TypeInformation<WindowedValue<KV<K, List<InputT>>>> partialReduceTypeInfo =
- new CoderTypeInformation<>(
- WindowedValue.getFullCoder(
- KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
- windowingStrategy.getWindowFn().windowCoder()));
-
-
- Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
- inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
-
- FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> partialReduceFunction;
- FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction;
-
- if (windowingStrategy.getWindowFn().isNonMerging()) {
- @SuppressWarnings("unchecked")
- WindowingStrategy<?, BoundedWindow> boundedStrategy =
- (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
-
- partialReduceFunction = new FlinkPartialReduceFunction<>(
- combineFn,
- boundedStrategy,
- Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
- context.getPipelineOptions());
-
- reduceFunction = new FlinkReduceFunction<>(
- combineFn,
- boundedStrategy,
- Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
- context.getPipelineOptions());
-
- } else {
- if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
- throw new UnsupportedOperationException(
- "Merging WindowFn with windows other than IntervalWindow are not supported.");
- }
-
- @SuppressWarnings("unchecked")
- WindowingStrategy<?, IntervalWindow> intervalStrategy =
- (WindowingStrategy<?, IntervalWindow>) windowingStrategy;
-
- partialReduceFunction = new FlinkMergingPartialReduceFunction<>(
- combineFn,
- intervalStrategy,
- Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
- context.getPipelineOptions());
-
- reduceFunction = new FlinkMergingReduceFunction<>(
- combineFn,
- intervalStrategy,
- Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
- context.getPipelineOptions());
- }
-
- // Partially GroupReduce the values into the intermediate format AccumT (combine)
- GroupCombineOperator<
- WindowedValue<KV<K, InputT>>,
- WindowedValue<KV<K, List<InputT>>>> groupCombine =
- new GroupCombineOperator<>(
- inputGrouping,
- partialReduceTypeInfo,
- partialReduceFunction,
- "GroupCombine: " + transform.getName());
-
- Grouping<WindowedValue<KV<K, List<InputT>>>> intermediateGrouping =
- groupCombine.groupBy(new KvKeySelector<List<InputT>, K>(inputCoder.getKeyCoder()));
-
- // Fully reduce the values and create output format VO
- GroupReduceOperator<
- WindowedValue<KV<K, List<InputT>>>, WindowedValue<KV<K, List<InputT>>>> outputDataSet =
- new GroupReduceOperator<>(
- intermediateGrouping, partialReduceTypeInfo, reduceFunction, transform.getName());
-
- context.setOutputDataSet(context.getOutput(transform), outputDataSet);
-
- }
-
- }
-
- private static class ReshuffleTranslatorBatch<K, InputT>
- implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Reshuffle<K, InputT>> {
-
- @Override
- public void translateNode(
- Reshuffle<K, InputT> transform,
- FlinkBatchTranslationContext context) {
-
- DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
- context.getInputDataSet(context.getInput(transform));
-
- context.setOutputDataSet(context.getOutput(transform), inputDataSet.rebalance());
-
- }
-
- }
-
- /**
- * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
- *
- * <p>For internal use to translate {@link GroupByKey}. For a large {@link PCollection} this
- * is expected to crash!
- *
- * <p>This is copied from the dataflow runner code.
- *
- * @param <T> the type of elements to concatenate.
- */
- private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
- @Override
- public List<T> createAccumulator() {
- return new ArrayList<>();
- }
-
- @Override
- public List<T> addInput(List<T> accumulator, T input) {
- accumulator.add(input);
- return accumulator;
- }
-
- @Override
- public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
- List<T> result = createAccumulator();
- for (List<T> accumulator : accumulators) {
- result.addAll(accumulator);
- }
- return result;
- }
-
- @Override
- public List<T> extractOutput(List<T> accumulator) {
- return accumulator;
- }
-
- @Override
- public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
-
- @Override
- public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
- }
-
-
- private static class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
- implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
- Combine.PerKey<K, InputT, OutputT>> {
-
- @Override
- @SuppressWarnings("unchecked")
- public void translateNode(
- Combine.PerKey<K, InputT, OutputT> transform,
- FlinkBatchTranslationContext context) {
- DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
- context.getInputDataSet(context.getInput(transform));
-
- CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn =
- (CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT>) transform.getFn();
-
- KvCoder<K, InputT> inputCoder =
- (KvCoder<K, InputT>) context.getInput(transform).getCoder();
-
- Coder<AccumT> accumulatorCoder;
-
- try {
- accumulatorCoder =
- combineFn.getAccumulatorCoder(
- context.getInput(transform).getPipeline().getCoderRegistry(),
- inputCoder.getKeyCoder(),
- inputCoder.getValueCoder());
- } catch (CannotProvideCoderException e) {
- throw new RuntimeException(e);
- }
-
- WindowingStrategy<?, ?> windowingStrategy =
- context.getInput(transform).getWindowingStrategy();
-
- TypeInformation<WindowedValue<KV<K, AccumT>>> partialReduceTypeInfo =
- context.getTypeInfo(
- KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
- windowingStrategy);
-
- Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
- inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
-
- // construct a map from side input to WindowingStrategy so that
- // the DoFn runner can map main-input windows to side input windows
- Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
- for (PCollectionView<?> sideInput: transform.getSideInputs()) {
- sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
- }
-
- if (windowingStrategy.getWindowFn().isNonMerging()) {
- WindowingStrategy<?, BoundedWindow> boundedStrategy =
- (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
-
- FlinkPartialReduceFunction<K, InputT, AccumT, ?> partialReduceFunction =
- new FlinkPartialReduceFunction<>(
- combineFn,
- boundedStrategy,
- sideInputStrategies,
- context.getPipelineOptions());
-
- FlinkReduceFunction<K, AccumT, OutputT, ?> reduceFunction =
- new FlinkReduceFunction<>(
- combineFn,
- boundedStrategy,
- sideInputStrategies,
- context.getPipelineOptions());
-
- // Partially GroupReduce the values into the intermediate format AccumT (combine)
- GroupCombineOperator<
- WindowedValue<KV<K, InputT>>,
- WindowedValue<KV<K, AccumT>>> groupCombine =
- new GroupCombineOperator<>(
- inputGrouping,
- partialReduceTypeInfo,
- partialReduceFunction,
- "GroupCombine: " + transform.getName());
-
- transformSideInputs(transform.getSideInputs(), groupCombine, context);
-
- TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
- context.getTypeInfo(context.getOutput(transform));
-
- Grouping<WindowedValue<KV<K, AccumT>>> intermediateGrouping =
- groupCombine.groupBy(new KvKeySelector<AccumT, K>(inputCoder.getKeyCoder()));
-
- // Fully reduce the values and create output format OutputT
- GroupReduceOperator<
- WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> outputDataSet =
- new GroupReduceOperator<>(
- intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName());
-
- transformSideInputs(transform.getSideInputs(), outputDataSet, context);
-
- context.setOutputDataSet(context.getOutput(transform), outputDataSet);
-
- } else {
- if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
- throw new UnsupportedOperationException(
- "Merging WindowFn with windows other than IntervalWindow are not supported.");
- }
-
- // for merging windows we can't to a pre-shuffle combine step since
- // elements would not be in their correct windows for side-input access
-
- WindowingStrategy<?, IntervalWindow> intervalStrategy =
- (WindowingStrategy<?, IntervalWindow>) windowingStrategy;
-
- FlinkMergingNonShuffleReduceFunction<K, InputT, AccumT, OutputT, ?> reduceFunction =
- new FlinkMergingNonShuffleReduceFunction<>(
- combineFn,
- intervalStrategy,
- sideInputStrategies,
- context.getPipelineOptions());
-
- TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
- context.getTypeInfo(context.getOutput(transform));
-
- Grouping<WindowedValue<KV<K, InputT>>> grouping =
- inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
-
- // Fully reduce the values and create output format OutputT
- GroupReduceOperator<
- WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> outputDataSet =
- new GroupReduceOperator<>(
- grouping, reduceTypeInfo, reduceFunction, transform.getName());
-
- transformSideInputs(transform.getSideInputs(), outputDataSet, context);
-
- context.setOutputDataSet(context.getOutput(transform), outputDataSet);
- }
-
-
- }
- }
-
- private static void rejectSplittable(DoFn<?, ?> doFn) {
- DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
- if (signature.processElement().isSplittable()) {
- throw new UnsupportedOperationException(
- String.format(
- "%s does not currently support splittable DoFn: %s",
- FlinkRunner.class.getSimpleName(), doFn));
- }
- }
-
- private static class ParDoTranslatorBatch<InputT, OutputT>
- implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
- ParDo.MultiOutput<InputT, OutputT>> {
-
- @Override
- @SuppressWarnings("unchecked")
- public void translateNode(
- ParDo.MultiOutput<InputT, OutputT> transform,
- FlinkBatchTranslationContext context) {
- DoFn<InputT, OutputT> doFn = transform.getFn();
- rejectSplittable(doFn);
- DataSet<WindowedValue<InputT>> inputDataSet =
- context.getInputDataSet(context.getInput(transform));
-
- Map<TupleTag<?>, PValue> outputs = context.getOutputs(transform);
-
- Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
- // put the main output at index 0, FlinkMultiOutputDoFnFunction expects this
- outputMap.put(transform.getMainOutputTag(), 0);
- int count = 1;
- for (TupleTag<?> tag : outputs.keySet()) {
- if (!outputMap.containsKey(tag)) {
- outputMap.put(tag, count++);
- }
- }
-
- // assume that the windowing strategy is the same for all outputs
- WindowingStrategy<?, ?> windowingStrategy = null;
-
- // collect all output Coders and create a UnionCoder for our tagged outputs
- List<Coder<?>> outputCoders = Lists.newArrayList();
- for (PValue taggedValue : outputs.values()) {
- checkState(
- taggedValue instanceof PCollection,
- "Within ParDo, got a non-PCollection output %s of type %s",
- taggedValue,
- taggedValue.getClass().getSimpleName());
- PCollection<?> coll = (PCollection<?>) taggedValue;
- outputCoders.add(coll.getCoder());
- windowingStrategy = coll.getWindowingStrategy();
- }
-
- if (windowingStrategy == null) {
- throw new IllegalStateException("No outputs defined.");
- }
-
- UnionCoder unionCoder = UnionCoder.of(outputCoders);
-
- TypeInformation<WindowedValue<RawUnionValue>> typeInformation =
- new CoderTypeInformation<>(
- WindowedValue.getFullCoder(
- unionCoder,
- windowingStrategy.getWindowFn().windowCoder()));
-
- List<PCollectionView<?>> sideInputs = transform.getSideInputs();
-
- // construct a map from side input to WindowingStrategy so that
- // the DoFn runner can map main-input windows to side input windows
- Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
- for (PCollectionView<?> sideInput: sideInputs) {
- sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
- }
-
- SingleInputUdfOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>, ?> outputDataSet;
- DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
- if (signature.stateDeclarations().size() > 0
- || signature.timerDeclarations().size() > 0) {
-
- // Based on the fact that the signature is stateful, DoFnSignatures ensures
- // that it is also keyed
- KvCoder<?, InputT> inputCoder =
- (KvCoder<?, InputT>) context.getInput(transform).getCoder();
-
- FlinkStatefulDoFnFunction<?, ?, OutputT> doFnWrapper = new FlinkStatefulDoFnFunction<>(
- (DoFn) doFn, windowingStrategy, sideInputStrategies, context.getPipelineOptions(),
- outputMap, transform.getMainOutputTag()
- );
-
- Grouping<WindowedValue<InputT>> grouping =
- inputDataSet.groupBy(new KvKeySelector(inputCoder.getKeyCoder()));
-
- outputDataSet =
- new GroupReduceOperator(grouping, typeInformation, doFnWrapper, transform.getName());
-
- } else {
- FlinkDoFnFunction<InputT, RawUnionValue> doFnWrapper =
- new FlinkDoFnFunction(
- doFn,
- windowingStrategy,
- sideInputStrategies,
- context.getPipelineOptions(),
- outputMap,
- transform.getMainOutputTag());
-
- outputDataSet = new MapPartitionOperator<>(
- inputDataSet, typeInformation,
- doFnWrapper, transform.getName());
-
- }
-
- transformSideInputs(sideInputs, outputDataSet, context);
-
- for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
- pruneOutput(
- outputDataSet,
- context,
- outputMap.get(output.getKey()),
- (PCollection) output.getValue());
- }
-
- }
-
- private <T> void pruneOutput(
- DataSet<WindowedValue<RawUnionValue>> taggedDataSet,
- FlinkBatchTranslationContext context,
- int integerTag,
- PCollection<T> collection) {
- TypeInformation<WindowedValue<T>> outputType = context.getTypeInfo(collection);
-
- FlinkMultiOutputPruningFunction<T> pruningFunction =
- new FlinkMultiOutputPruningFunction<>(integerTag);
-
- FlatMapOperator<WindowedValue<RawUnionValue>, WindowedValue<T>> pruningOperator =
- new FlatMapOperator<>(
- taggedDataSet,
- outputType,
- pruningFunction,
- collection.getName());
-
- context.setOutputDataSet(collection, pruningOperator);
- }
- }
-
- private static class FlattenPCollectionTranslatorBatch<T>
- implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
- Flatten.PCollections<T>> {
-
- @Override
- @SuppressWarnings("unchecked")
- public void translateNode(
- Flatten.PCollections<T> transform,
- FlinkBatchTranslationContext context) {
-
- Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform);
- DataSet<WindowedValue<T>> result = null;
-
- if (allInputs.isEmpty()) {
-
- // create an empty dummy source to satisfy downstream operations
- // we cannot create an empty source in Flink, therefore we have to
- // add the flatMap that simply never forwards the single element
- DataSource<String> dummySource =
- context.getExecutionEnvironment().fromElements("dummy");
- result = dummySource.flatMap(new FlatMapFunction<String, WindowedValue<T>>() {
- @Override
- public void flatMap(String s, Collector<WindowedValue<T>> collector) throws Exception {
- // never return anything
- }
- }).returns(
- new CoderTypeInformation<>(
- WindowedValue.getFullCoder(
- (Coder<T>) VoidCoder.of(),
- GlobalWindow.Coder.INSTANCE)));
- } else {
- for (PValue taggedPc : allInputs.values()) {
- checkArgument(
- taggedPc instanceof PCollection,
- "Got non-PCollection input to flatten: %s of type %s",
- taggedPc,
- taggedPc.getClass().getSimpleName());
- PCollection<T> collection = (PCollection<T>) taggedPc;
- DataSet<WindowedValue<T>> current = context.getInputDataSet(collection);
- if (result == null) {
- result = current;
- } else {
- result = result.union(current);
- }
- }
- }
-
- // insert a dummy filter, there seems to be a bug in Flink
- // that produces duplicate elements after the union in some cases
- // if we don't
- result = result.filter(new FilterFunction<WindowedValue<T>>() {
- @Override
- public boolean filter(WindowedValue<T> tWindowedValue) throws Exception {
- return true;
- }
- }).name("UnionFixFilter");
- context.setOutputDataSet(context.getOutput(transform), result);
- }
- }
-
- private static class CreatePCollectionViewTranslatorBatch<ElemT, ViewT>
- implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
- View.CreatePCollectionView<ElemT, ViewT>> {
-
- @Override
- public void translateNode(
- View.CreatePCollectionView<ElemT, ViewT> transform,
- FlinkBatchTranslationContext context) {
- DataSet<WindowedValue<ElemT>> inputDataSet =
- context.getInputDataSet(context.getInput(transform));
-
- PCollectionView<ViewT> input = transform.getView();
-
- context.setSideInputDataSet(input, inputDataSet);
- }
- }
-
- private static void transformSideInputs(
- List<PCollectionView<?>> sideInputs,
- SingleInputUdfOperator<?, ?, ?> outputDataSet,
- FlinkBatchTranslationContext context) {
- // get corresponding Flink broadcast DataSets
- for (PCollectionView<?> input : sideInputs) {
- DataSet<?> broadcastSet = context.getSideInputDataSet(input);
- outputDataSet.withBroadcastSet(broadcastSet, input.getTagInternal().getId());
- }
- }
-
- private FlinkBatchTransformTranslators() {}
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
deleted file mode 100644
index 98dd0fb..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.common.collect.Iterables;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Helper for {@link FlinkBatchPipelineTranslator} and translators in
- * {@link FlinkBatchTransformTranslators}.
- */
-class FlinkBatchTranslationContext {
-
- private final Map<PValue, DataSet<?>> dataSets;
- private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets;
-
- /**
- * For keeping track about which DataSets don't have a successor. We
- * need to terminate these with a discarding sink because the Beam
- * model allows dangling operations.
- */
- private final Map<PValue, DataSet<?>> danglingDataSets;
-
- private final ExecutionEnvironment env;
- private final PipelineOptions options;
-
- private AppliedPTransform<?, ?, ?> currentTransform;
-
- // ------------------------------------------------------------------------
-
- public FlinkBatchTranslationContext(ExecutionEnvironment env, PipelineOptions options) {
- this.env = env;
- this.options = options;
- this.dataSets = new HashMap<>();
- this.broadcastDataSets = new HashMap<>();
-
- this.danglingDataSets = new HashMap<>();
- }
-
- // ------------------------------------------------------------------------
-
- public Map<PValue, DataSet<?>> getDanglingDataSets() {
- return danglingDataSets;
- }
-
- public ExecutionEnvironment getExecutionEnvironment() {
- return env;
- }
-
- public PipelineOptions getPipelineOptions() {
- return options;
- }
-
- @SuppressWarnings("unchecked")
- public <T> DataSet<WindowedValue<T>> getInputDataSet(PValue value) {
- // assume that the DataSet is used as an input if retrieved here
- danglingDataSets.remove(value);
- return (DataSet<WindowedValue<T>>) dataSets.get(value);
- }
-
- public <T> void setOutputDataSet(PValue value, DataSet<WindowedValue<T>> set) {
- if (!dataSets.containsKey(value)) {
- dataSets.put(value, set);
- danglingDataSets.put(value, set);
- }
- }
-
- /**
- * Sets the AppliedPTransform which carries input/output.
- * @param currentTransform
- */
- public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
- this.currentTransform = currentTransform;
- }
-
- @SuppressWarnings("unchecked")
- public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) {
- return (DataSet<T>) broadcastDataSets.get(value);
- }
-
- public <ViewT, ElemT> void setSideInputDataSet(
- PCollectionView<ViewT> value,
- DataSet<WindowedValue<ElemT>> set) {
- if (!broadcastDataSets.containsKey(value)) {
- broadcastDataSets.put(value, set);
- }
- }
-
- @SuppressWarnings("unchecked")
- public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) {
- return getTypeInfo(collection.getCoder(), collection.getWindowingStrategy());
- }
-
- @SuppressWarnings("unchecked")
- public <T> TypeInformation<WindowedValue<T>> getTypeInfo(
- Coder<T> coder,
- WindowingStrategy<?, ?> windowingStrategy) {
- WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
- WindowedValue.getFullCoder(
- coder,
- windowingStrategy.getWindowFn().windowCoder());
-
- return new CoderTypeInformation<>(windowedValueCoder);
- }
-
- Map<TupleTag<?>, PValue> getInputs(PTransform<?, ?> transform) {
- return currentTransform.getInputs();
- }
-
- @SuppressWarnings("unchecked")
- <T extends PValue> T getInput(PTransform<T, ?> transform) {
- return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
- }
-
- Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) {
- return currentTransform.getOutputs();
- }
-
- @SuppressWarnings("unchecked")
- <T extends PValue> T getOutput(PTransform<?, T> transform) {
- return (T) Iterables.getOnlyElement(currentTransform.getOutputs().values());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
deleted file mode 100644
index bf4395f..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import java.io.IOException;
-
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.joda.time.Duration;
-
-
-/**
- * Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline} with Flink.
- * In detached execution, results and job execution are currently unavailable.
- */
-public class FlinkDetachedRunnerResult implements PipelineResult {
-
- FlinkDetachedRunnerResult() {}
-
- @Override
- public State getState() {
- return State.UNKNOWN;
- }
-
- @Override
- public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
- throws AggregatorRetrievalException {
- throw new AggregatorRetrievalException(
- "Accumulators can't be retrieved for detached Job executions.",
- new UnsupportedOperationException());
- }
-
- @Override
- public MetricResults metrics() {
- throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
- }
-
- @Override
- public State cancel() throws IOException {
- throw new UnsupportedOperationException("Cancelling is not yet supported.");
- }
-
- @Override
- public State waitUntilFinish() {
- return State.UNKNOWN;
- }
-
- @Override
- public State waitUntilFinish(Duration duration) {
- return State.UNKNOWN;
- }
-
- @Override
- public String toString() {
- return "FlinkDetachedRunnerResult{}";
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
deleted file mode 100644
index ba00036..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.List;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.CollectionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The class that instantiates and manages the execution of a given job.
- * Depending on if the job is a Streaming or Batch processing one, it creates
- * the adequate execution environment ({@link ExecutionEnvironment}
- * or {@link StreamExecutionEnvironment}), the necessary {@link FlinkPipelineTranslator}
- * ({@link FlinkBatchPipelineTranslator} or {@link FlinkStreamingPipelineTranslator}) to
- * transform the Beam job into a Flink one, and executes the (translated) job.
- */
-class FlinkPipelineExecutionEnvironment {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
-
- private final FlinkPipelineOptions options;
-
- /**
- * The Flink Batch execution environment. This is instantiated to either a
- * {@link org.apache.flink.api.java.CollectionEnvironment},
- * a {@link org.apache.flink.api.java.LocalEnvironment} or
- * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration
- * options.
- */
- private ExecutionEnvironment flinkBatchEnv;
-
- /**
- * The Flink Streaming execution environment. This is instantiated to either a
- * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
- * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
- * on the configuration options, and more specifically, the url of the master.
- */
- private StreamExecutionEnvironment flinkStreamEnv;
-
- /**
- * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the
- * provided {@link FlinkPipelineOptions}.
- *
- * @param options the user-defined pipeline options.
- * */
- FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
- this.options = checkNotNull(options);
- }
-
- /**
- * Depending on if the job is a Streaming or a Batch one, this method creates
- * the necessary execution environment and pipeline translator, and translates
- * the {@link org.apache.beam.sdk.values.PCollection} program into
- * a {@link org.apache.flink.api.java.DataSet}
- * or {@link org.apache.flink.streaming.api.datastream.DataStream} one.
- * */
- public void translate(FlinkRunner flinkRunner, Pipeline pipeline) {
- this.flinkBatchEnv = null;
- this.flinkStreamEnv = null;
-
- PipelineTranslationOptimizer optimizer =
- new PipelineTranslationOptimizer(TranslationMode.BATCH, options);
-
- optimizer.translate(pipeline);
- TranslationMode translationMode = optimizer.getTranslationMode();
-
- FlinkPipelineTranslator translator;
- if (translationMode == TranslationMode.STREAMING) {
- this.flinkStreamEnv = createStreamExecutionEnvironment();
- translator = new FlinkStreamingPipelineTranslator(flinkRunner, flinkStreamEnv, options);
- } else {
- this.flinkBatchEnv = createBatchExecutionEnvironment();
- translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
- }
-
- translator.translate(pipeline);
- }
-
- /**
- * Launches the program execution.
- * */
- public JobExecutionResult executePipeline() throws Exception {
- final String jobName = options.getJobName();
-
- if (flinkBatchEnv != null) {
- return flinkBatchEnv.execute(jobName);
- } else if (flinkStreamEnv != null) {
- return flinkStreamEnv.execute(jobName);
- } else {
- throw new IllegalStateException("The Pipeline has not yet been translated.");
- }
- }
-
- /**
- * If the submitted job is a batch processing job, this method creates the adequate
- * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
- * on the user-specified options.
- */
- private ExecutionEnvironment createBatchExecutionEnvironment() {
-
- LOG.info("Creating the required Batch Execution Environment.");
-
- String masterUrl = options.getFlinkMaster();
- ExecutionEnvironment flinkBatchEnv;
-
- // depending on the master, create the right environment.
- if (masterUrl.equals("[local]")) {
- flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
- } else if (masterUrl.equals("[collection]")) {
- flinkBatchEnv = new CollectionEnvironment();
- } else if (masterUrl.equals("[auto]")) {
- flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
- } else if (masterUrl.matches(".*:\\d*")) {
- String[] parts = masterUrl.split(":");
- List<String> stagingFiles = options.getFilesToStage();
- flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
- Integer.parseInt(parts[1]),
- stagingFiles.toArray(new String[stagingFiles.size()]));
- } else {
- LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
- flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
- }
-
- // set the correct parallelism.
- if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof CollectionEnvironment)) {
- flinkBatchEnv.setParallelism(options.getParallelism());
- }
-
- // set parallelism in the options (required by some execution code)
- options.setParallelism(flinkBatchEnv.getParallelism());
-
- if (options.getObjectReuse()) {
- flinkBatchEnv.getConfig().enableObjectReuse();
- } else {
- flinkBatchEnv.getConfig().disableObjectReuse();
- }
-
- return flinkBatchEnv;
- }
-
- /**
- * If the submitted job is a stream processing job, this method creates the adequate
- * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
- * on the user-specified options.
- */
- private StreamExecutionEnvironment createStreamExecutionEnvironment() {
-
- LOG.info("Creating the required Streaming Environment.");
-
- String masterUrl = options.getFlinkMaster();
- StreamExecutionEnvironment flinkStreamEnv = null;
-
- // depending on the master, create the right environment.
- if (masterUrl.equals("[local]")) {
- flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
- } else if (masterUrl.equals("[auto]")) {
- flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- } else if (masterUrl.matches(".*:\\d*")) {
- String[] parts = masterUrl.split(":");
- List<String> stagingFiles = options.getFilesToStage();
- flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
- Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()]));
- } else {
- LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
- flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- }
-
- // set the correct parallelism.
- if (options.getParallelism() != -1) {
- flinkStreamEnv.setParallelism(options.getParallelism());
- }
-
- // set parallelism in the options (required by some execution code)
- options.setParallelism(flinkStreamEnv.getParallelism());
-
- if (options.getObjectReuse()) {
- flinkStreamEnv.getConfig().enableObjectReuse();
- } else {
- flinkStreamEnv.getConfig().disableObjectReuse();
- }
-
- // default to event time
- flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- // for the following 2 parameters, a value of -1 means that Flink will use
- // the default values as specified in the configuration.
- int numRetries = options.getNumberOfExecutionRetries();
- if (numRetries != -1) {
- flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
- }
- long retryDelay = options.getExecutionRetryDelay();
- if (retryDelay != -1) {
- flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
- }
-
- // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink).
- // If the value is not -1, then the validity checks are applied.
- // By default, checkpointing is disabled.
- long checkpointInterval = options.getCheckpointingInterval();
- if (checkpointInterval != -1) {
- if (checkpointInterval < 1) {
- throw new IllegalArgumentException("The checkpoint interval must be positive");
- }
- flinkStreamEnv.enableCheckpointing(checkpointInterval);
- }
-
- // State backend
- final AbstractStateBackend stateBackend = options.getStateBackend();
- if (stateBackend != null) {
- flinkStreamEnv.setStateBackend(stateBackend);
- }
-
- return flinkStreamEnv;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
deleted file mode 100644
index ef9afea..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import java.util.List;
-import org.apache.beam.sdk.options.ApplicationNameOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-
-/**
- * Options which can be used to configure a Flink PipelineRunner.
- */
-public interface FlinkPipelineOptions
- extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
-
- /**
- * List of local files to make available to workers.
- *
- * <p>Jars are placed on the worker's classpath.
- *
- * <p>The default value is the list of jars from the main program's classpath.
- */
- @Description("Jar-Files to send to all workers and put on the classpath. "
- + "The default value is all files from the classpath.")
- @JsonIgnore
- List<String> getFilesToStage();
- void setFilesToStage(List<String> value);
-
- /**
- * The url of the Flink JobManager on which to execute pipelines. This can either be
- * the the address of a cluster JobManager, in the form "host:port" or one of the special
- * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink
- * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while
- * "[auto]" will let the system decide where to execute the pipeline based on the environment.
- */
- @Description("Address of the Flink Master where the Pipeline should be executed. Can"
- + " either be of the form \"host:port\" or one of the special values [local], "
- + "[collection] or [auto].")
- String getFlinkMaster();
- void setFlinkMaster(String value);
-
- @Description("The degree of parallelism to be used when distributing operations onto workers.")
- @Default.InstanceFactory(DefaultParallelismFactory.class)
- Integer getParallelism();
- void setParallelism(Integer value);
-
- @Description("The interval between consecutive checkpoints (i.e. snapshots of the current"
- + "pipeline state used for fault tolerance).")
- @Default.Long(-1L)
- Long getCheckpointingInterval();
- void setCheckpointingInterval(Long interval);
-
- @Description("Sets the number of times that failed tasks are re-executed. "
- + "A value of zero effectively disables fault tolerance. A value of -1 indicates "
- + "that the system default value (as defined in the configuration) should be used.")
- @Default.Integer(-1)
- Integer getNumberOfExecutionRetries();
- void setNumberOfExecutionRetries(Integer retries);
-
- @Description("Sets the delay between executions. A value of {@code -1} "
- + "indicates that the default value should be used.")
- @Default.Long(-1L)
- Long getExecutionRetryDelay();
- void setExecutionRetryDelay(Long delay);
-
- @Description("Sets the behavior of reusing objects.")
- @Default.Boolean(false)
- Boolean getObjectReuse();
- void setObjectReuse(Boolean reuse);
-
- /**
- * State backend to store Beam's state during computation.
- * Note: Only applicable when executing in streaming mode.
- */
- @Description("Sets the state backend to use in streaming mode. "
- + "Otherwise the default is read from the Flink config.")
- @JsonIgnore
- AbstractStateBackend getStateBackend();
- void setStateBackend(AbstractStateBackend stateBackend);
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java
deleted file mode 100644
index 65f416d..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-
-/**
- * The role of this class is to translate the Beam operators to
- * their Flink counterparts. If we have a streaming job, this is instantiated as a
- * {@link FlinkStreamingPipelineTranslator}. In other case, i.e. for a batch job,
- * a {@link FlinkBatchPipelineTranslator} is created. Correspondingly, the
- * {@link org.apache.beam.sdk.values.PCollection}-based user-provided job is translated into
- * a {@link org.apache.flink.streaming.api.datastream.DataStream} (for streaming) or a
- * {@link org.apache.flink.api.java.DataSet} (for batch) one.
- */
-abstract class FlinkPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
-
- /**
- * Translates the pipeline by passing this class as a visitor.
- * @param pipeline The pipeline to be translated
- */
- public void translate(Pipeline pipeline) {
- pipeline.traverseTopologically(this);
- }
-
- /**
- * Utility formatting method.
- * @param n number of spaces to generate
- * @return String with "|" followed by n spaces
- */
- protected static String genSpaces(int n) {
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < n; i++) {
- builder.append("| ");
- }
- return builder.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
deleted file mode 100644
index 096f030..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.common.base.Joiner;
-import java.io.File;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.program.DetachedEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link PipelineRunner} that executes the operations in the
- * pipeline by first translating them to a Flink Plan and then executing them either locally
- * or on a Flink cluster, depending on the configuration.
- */
-public class FlinkRunner extends PipelineRunner<PipelineResult> {
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkRunner.class);
-
- /**
- * Provided options.
- */
- private final FlinkPipelineOptions options;
-
- /**
- * Construct a runner from the provided options.
- *
- * @param options Properties which configure the runner.
- * @return The newly created runner.
- */
- public static FlinkRunner fromOptions(PipelineOptions options) {
- FlinkPipelineOptions flinkOptions =
- PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
- ArrayList<String> missing = new ArrayList<>();
-
- if (flinkOptions.getAppName() == null) {
- missing.add("appName");
- }
- if (missing.size() > 0) {
- throw new IllegalArgumentException(
- "Missing required values: " + Joiner.on(',').join(missing));
- }
-
- if (flinkOptions.getFilesToStage() == null) {
- flinkOptions.setFilesToStage(detectClassPathResourcesToStage(
- FlinkRunner.class.getClassLoader()));
- LOG.info("PipelineOptions.filesToStage was not specified. "
- + "Defaulting to files from the classpath: will stage {} files. "
- + "Enable logging at DEBUG level to see which files will be staged.",
- flinkOptions.getFilesToStage().size());
- LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
- }
-
- // Set Flink Master to [auto] if no option was specified.
- if (flinkOptions.getFlinkMaster() == null) {
- flinkOptions.setFlinkMaster("[auto]");
- }
-
- return new FlinkRunner(flinkOptions);
- }
-
- private FlinkRunner(FlinkPipelineOptions options) {
- this.options = options;
- this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
- }
-
- @Override
- public PipelineResult run(Pipeline pipeline) {
- logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
-
- LOG.info("Executing pipeline using FlinkRunner.");
-
- FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);
-
- LOG.info("Translating pipeline to Flink program.");
- env.translate(this, pipeline);
-
- JobExecutionResult result;
- try {
- LOG.info("Starting execution of Flink program.");
- result = env.executePipeline();
- } catch (Exception e) {
- LOG.error("Pipeline execution failed", e);
- throw new RuntimeException("Pipeline execution failed", e);
- }
-
- if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) {
- LOG.info("Pipeline submitted in Detached mode");
- return new FlinkDetachedRunnerResult();
- } else {
- LOG.info("Execution finished in {} msecs", result.getNetRuntime());
- Map<String, Object> accumulators = result.getAllAccumulatorResults();
- if (accumulators != null && !accumulators.isEmpty()) {
- LOG.info("Final aggregator values:");
-
- for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) {
- LOG.info("{} : {}", entry.getKey(), entry.getValue());
- }
- }
-
- return new FlinkRunnerResult(accumulators, result.getNetRuntime());
- }
- }
-
- /**
- * For testing.
- */
- public FlinkPipelineOptions getPipelineOptions() {
- return options;
- }
-
- @Override
- public String toString() {
- return "FlinkRunner#" + hashCode();
- }
-
- /**
- * Attempts to detect all the resources the class loader has access to. This does not recurse
- * to class loader parents stopping it from pulling in resources from the system class loader.
- *
- * @param classLoader The URLClassLoader to use to detect resources to stage.
- * @return A list of absolute paths to the resources the class loader uses.
- * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one
- * of the resources the class loader exposes is not a file resource.
- */
- protected static List<String> detectClassPathResourcesToStage(
- ClassLoader classLoader) {
- if (!(classLoader instanceof URLClassLoader)) {
- String message = String.format("Unable to use ClassLoader to detect classpath elements. "
- + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
- LOG.error(message);
- throw new IllegalArgumentException(message);
- }
-
- List<String> files = new ArrayList<>();
- for (URL url : ((URLClassLoader) classLoader).getURLs()) {
- try {
- files.add(new File(url.toURI()).getAbsolutePath());
- } catch (IllegalArgumentException | URISyntaxException e) {
- String message = String.format("Unable to convert url (%s) to file.", url);
- LOG.error(message);
- throw new IllegalArgumentException(message, e);
- }
- }
- return files;
- }
-
- /** A set of {@link View}s with non-deterministic key coders. */
- Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders;
-
- /**
- * Records that the {@link PTransform} requires a deterministic key coder.
- */
- void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
- ptransformViewsWithNonDeterministicKeyCoders.add(ptransform);
- }
-
- /** Outputs a warning about PCollection views without deterministic key coders. */
- private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pipeline) {
- // We need to wait till this point to determine the names of the transforms since only
- // at this time do we know the hierarchy of the transforms otherwise we could
- // have just recorded the full names during apply time.
- if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
- final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>();
- pipeline.traverseTopologically(new Pipeline.PipelineVisitor() {
- @Override
- public void visitValue(PValue value, TransformHierarchy.Node producer) {
- }
-
- @Override
- public void visitPrimitiveTransform(TransformHierarchy.Node node) {
- if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
- ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
- }
- }
-
- @Override
- public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
- if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
- ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
- }
- return CompositeBehavior.ENTER_TRANSFORM;
- }
-
- @Override
- public void leaveCompositeTransform(TransformHierarchy.Node node) {
- }
- });
-
- LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} "
- + "because the key coder is not deterministic. Falling back to singleton implementation "
- + "which may cause memory and/or performance problems. Future major versions of "
- + "the Flink runner will require deterministic key coders.",
- ptransformViewNamesWithNonDeterministicKeyCoders);
- }
- }
-}