You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/04/19 13:09:17 UTC
[08/18] beam git commit: [BEAM-1994] Remove Flink examples package
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
new file mode 100644
index 0000000..ba00036
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The class that instantiates and manages the execution of a given job.
+ * Depending on if the job is a Streaming or Batch processing one, it creates
+ * the adequate execution environment ({@link ExecutionEnvironment}
+ * or {@link StreamExecutionEnvironment}), the necessary {@link FlinkPipelineTranslator}
+ * ({@link FlinkBatchPipelineTranslator} or {@link FlinkStreamingPipelineTranslator}) to
+ * transform the Beam job into a Flink one, and executes the (translated) job.
+ */
+class FlinkPipelineExecutionEnvironment {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
+
+ private final FlinkPipelineOptions options;
+
+ /**
+ * The Flink Batch execution environment. This is instantiated to either a
+ * {@link org.apache.flink.api.java.CollectionEnvironment},
+ * a {@link org.apache.flink.api.java.LocalEnvironment} or
+ * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration
+ * options.
+ */
+ private ExecutionEnvironment flinkBatchEnv;
+
+ /**
+ * The Flink Streaming execution environment. This is instantiated to either a
+ * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
+ * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
+ * on the configuration options, and more specifically, the url of the master.
+ */
+ private StreamExecutionEnvironment flinkStreamEnv;
+
+ /**
+ * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the
+ * provided {@link FlinkPipelineOptions}.
+ *
+ * @param options the user-defined pipeline options.
+ * */
+ FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
+ this.options = checkNotNull(options);
+ }
+
+ /**
+ * Depending on if the job is a Streaming or a Batch one, this method creates
+ * the necessary execution environment and pipeline translator, and translates
+ * the {@link org.apache.beam.sdk.values.PCollection} program into
+ * a {@link org.apache.flink.api.java.DataSet}
+ * or {@link org.apache.flink.streaming.api.datastream.DataStream} one.
+ * */
+ public void translate(FlinkRunner flinkRunner, Pipeline pipeline) {
+ this.flinkBatchEnv = null;
+ this.flinkStreamEnv = null;
+
+ PipelineTranslationOptimizer optimizer =
+ new PipelineTranslationOptimizer(TranslationMode.BATCH, options);
+
+ optimizer.translate(pipeline);
+ TranslationMode translationMode = optimizer.getTranslationMode();
+
+ FlinkPipelineTranslator translator;
+ if (translationMode == TranslationMode.STREAMING) {
+ this.flinkStreamEnv = createStreamExecutionEnvironment();
+ translator = new FlinkStreamingPipelineTranslator(flinkRunner, flinkStreamEnv, options);
+ } else {
+ this.flinkBatchEnv = createBatchExecutionEnvironment();
+ translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
+ }
+
+ translator.translate(pipeline);
+ }
+
+ /**
+ * Launches the program execution.
+ * */
+ public JobExecutionResult executePipeline() throws Exception {
+ final String jobName = options.getJobName();
+
+ if (flinkBatchEnv != null) {
+ return flinkBatchEnv.execute(jobName);
+ } else if (flinkStreamEnv != null) {
+ return flinkStreamEnv.execute(jobName);
+ } else {
+ throw new IllegalStateException("The Pipeline has not yet been translated.");
+ }
+ }
+
+ /**
+ * If the submitted job is a batch processing job, this method creates the adequate
+ * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
+ * on the user-specified options.
+ */
+ private ExecutionEnvironment createBatchExecutionEnvironment() {
+
+ LOG.info("Creating the required Batch Execution Environment.");
+
+ String masterUrl = options.getFlinkMaster();
+ ExecutionEnvironment flinkBatchEnv;
+
+ // depending on the master, create the right environment.
+ if (masterUrl.equals("[local]")) {
+ flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+ } else if (masterUrl.equals("[collection]")) {
+ flinkBatchEnv = new CollectionEnvironment();
+ } else if (masterUrl.equals("[auto]")) {
+ flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+ } else if (masterUrl.matches(".*:\\d*")) {
+ String[] parts = masterUrl.split(":");
+ List<String> stagingFiles = options.getFilesToStage();
+ flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
+ Integer.parseInt(parts[1]),
+ stagingFiles.toArray(new String[stagingFiles.size()]));
+ } else {
+ LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+ flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ // set the correct parallelism.
+ if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof CollectionEnvironment)) {
+ flinkBatchEnv.setParallelism(options.getParallelism());
+ }
+
+ // set parallelism in the options (required by some execution code)
+ options.setParallelism(flinkBatchEnv.getParallelism());
+
+ if (options.getObjectReuse()) {
+ flinkBatchEnv.getConfig().enableObjectReuse();
+ } else {
+ flinkBatchEnv.getConfig().disableObjectReuse();
+ }
+
+ return flinkBatchEnv;
+ }
+
+ /**
+ * If the submitted job is a stream processing job, this method creates the adequate
+ * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
+ * on the user-specified options.
+ */
+ private StreamExecutionEnvironment createStreamExecutionEnvironment() {
+
+ LOG.info("Creating the required Streaming Environment.");
+
+ String masterUrl = options.getFlinkMaster();
+ StreamExecutionEnvironment flinkStreamEnv = null;
+
+ // depending on the master, create the right environment.
+ if (masterUrl.equals("[local]")) {
+ flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
+ } else if (masterUrl.equals("[auto]")) {
+ flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ } else if (masterUrl.matches(".*:\\d*")) {
+ String[] parts = masterUrl.split(":");
+ List<String> stagingFiles = options.getFilesToStage();
+ flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
+ Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()]));
+ } else {
+ LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+ flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ // set the correct parallelism.
+ if (options.getParallelism() != -1) {
+ flinkStreamEnv.setParallelism(options.getParallelism());
+ }
+
+ // set parallelism in the options (required by some execution code)
+ options.setParallelism(flinkStreamEnv.getParallelism());
+
+ if (options.getObjectReuse()) {
+ flinkStreamEnv.getConfig().enableObjectReuse();
+ } else {
+ flinkStreamEnv.getConfig().disableObjectReuse();
+ }
+
+ // default to event time
+ flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ // for the following 2 parameters, a value of -1 means that Flink will use
+ // the default values as specified in the configuration.
+ int numRetries = options.getNumberOfExecutionRetries();
+ if (numRetries != -1) {
+ flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
+ }
+ long retryDelay = options.getExecutionRetryDelay();
+ if (retryDelay != -1) {
+ flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
+ }
+
+ // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink).
+ // If the value is not -1, then the validity checks are applied.
+ // By default, checkpointing is disabled.
+ long checkpointInterval = options.getCheckpointingInterval();
+ if (checkpointInterval != -1) {
+ if (checkpointInterval < 1) {
+ throw new IllegalArgumentException("The checkpoint interval must be positive");
+ }
+ flinkStreamEnv.enableCheckpointing(checkpointInterval);
+ }
+
+ // State backend
+ final AbstractStateBackend stateBackend = options.getStateBackend();
+ if (stateBackend != null) {
+ flinkStreamEnv.setStateBackend(stateBackend);
+ }
+
+ return flinkStreamEnv;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
new file mode 100644
index 0000000..ef9afea
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.List;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+/**
+ * Options which can be used to configure a Flink PipelineRunner.
+ */
+public interface FlinkPipelineOptions
+ extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
+
+ /**
+ * List of local files to make available to workers.
+ *
+ * <p>Jars are placed on the worker's classpath.
+ *
+ * <p>The default value is the list of jars from the main program's classpath.
+ */
+ @Description("Jar-Files to send to all workers and put on the classpath. "
+ + "The default value is all files from the classpath.")
+ @JsonIgnore
+ List<String> getFilesToStage();
+ void setFilesToStage(List<String> value);
+
+ /**
+ * The url of the Flink JobManager on which to execute pipelines. This can either be
+ * the the address of a cluster JobManager, in the form "host:port" or one of the special
+ * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink
+ * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while
+ * "[auto]" will let the system decide where to execute the pipeline based on the environment.
+ */
+ @Description("Address of the Flink Master where the Pipeline should be executed. Can"
+ + " either be of the form \"host:port\" or one of the special values [local], "
+ + "[collection] or [auto].")
+ String getFlinkMaster();
+ void setFlinkMaster(String value);
+
+ @Description("The degree of parallelism to be used when distributing operations onto workers.")
+ @Default.InstanceFactory(DefaultParallelismFactory.class)
+ Integer getParallelism();
+ void setParallelism(Integer value);
+
+ @Description("The interval between consecutive checkpoints (i.e. snapshots of the current"
+ + "pipeline state used for fault tolerance).")
+ @Default.Long(-1L)
+ Long getCheckpointingInterval();
+ void setCheckpointingInterval(Long interval);
+
+ @Description("Sets the number of times that failed tasks are re-executed. "
+ + "A value of zero effectively disables fault tolerance. A value of -1 indicates "
+ + "that the system default value (as defined in the configuration) should be used.")
+ @Default.Integer(-1)
+ Integer getNumberOfExecutionRetries();
+ void setNumberOfExecutionRetries(Integer retries);
+
+ @Description("Sets the delay between executions. A value of {@code -1} "
+ + "indicates that the default value should be used.")
+ @Default.Long(-1L)
+ Long getExecutionRetryDelay();
+ void setExecutionRetryDelay(Long delay);
+
+ @Description("Sets the behavior of reusing objects.")
+ @Default.Boolean(false)
+ Boolean getObjectReuse();
+ void setObjectReuse(Boolean reuse);
+
+ /**
+ * State backend to store Beam's state during computation.
+ * Note: Only applicable when executing in streaming mode.
+ */
+ @Description("Sets the state backend to use in streaming mode. "
+ + "Otherwise the default is read from the Flink config.")
+ @JsonIgnore
+ AbstractStateBackend getStateBackend();
+ void setStateBackend(AbstractStateBackend stateBackend);
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java
new file mode 100644
index 0000000..65f416d
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.Pipeline;
+
+/**
+ * The role of this class is to translate the Beam operators to
+ * their Flink counterparts. If we have a streaming job, this is instantiated as a
+ * {@link FlinkStreamingPipelineTranslator}. In other case, i.e. for a batch job,
+ * a {@link FlinkBatchPipelineTranslator} is created. Correspondingly, the
+ * {@link org.apache.beam.sdk.values.PCollection}-based user-provided job is translated into
+ * a {@link org.apache.flink.streaming.api.datastream.DataStream} (for streaming) or a
+ * {@link org.apache.flink.api.java.DataSet} (for batch) one.
+ */
+abstract class FlinkPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
+
+ /**
+ * Translates the pipeline by passing this class as a visitor.
+ * @param pipeline The pipeline to be translated
+ */
+ public void translate(Pipeline pipeline) {
+ pipeline.traverseTopologically(this);
+ }
+
+ /**
+ * Utility formatting method.
+ * @param n number of spaces to generate
+ * @return String with "|" followed by n spaces
+ */
+ protected static String genSpaces(int n) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < n; i++) {
+ builder.append("| ");
+ }
+ return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
new file mode 100644
index 0000000..096f030
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.base.Joiner;
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.DetachedEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the
+ * pipeline by first translating them to a Flink Plan and then executing them either locally
+ * or on a Flink cluster, depending on the configuration.
+ */
+public class FlinkRunner extends PipelineRunner<PipelineResult> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkRunner.class);
+
+ /**
+ * Provided options.
+ */
+ private final FlinkPipelineOptions options;
+
+ /**
+ * Construct a runner from the provided options.
+ *
+ * @param options Properties which configure the runner.
+ * @return The newly created runner.
+ */
+ public static FlinkRunner fromOptions(PipelineOptions options) {
+ FlinkPipelineOptions flinkOptions =
+ PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
+ ArrayList<String> missing = new ArrayList<>();
+
+ if (flinkOptions.getAppName() == null) {
+ missing.add("appName");
+ }
+ if (missing.size() > 0) {
+ throw new IllegalArgumentException(
+ "Missing required values: " + Joiner.on(',').join(missing));
+ }
+
+ if (flinkOptions.getFilesToStage() == null) {
+ flinkOptions.setFilesToStage(detectClassPathResourcesToStage(
+ FlinkRunner.class.getClassLoader()));
+ LOG.info("PipelineOptions.filesToStage was not specified. "
+ + "Defaulting to files from the classpath: will stage {} files. "
+ + "Enable logging at DEBUG level to see which files will be staged.",
+ flinkOptions.getFilesToStage().size());
+ LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
+ }
+
+ // Set Flink Master to [auto] if no option was specified.
+ if (flinkOptions.getFlinkMaster() == null) {
+ flinkOptions.setFlinkMaster("[auto]");
+ }
+
+ return new FlinkRunner(flinkOptions);
+ }
+
+ private FlinkRunner(FlinkPipelineOptions options) {
+ this.options = options;
+ this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
+ }
+
+ @Override
+ public PipelineResult run(Pipeline pipeline) {
+ logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
+
+ LOG.info("Executing pipeline using FlinkRunner.");
+
+ FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);
+
+ LOG.info("Translating pipeline to Flink program.");
+ env.translate(this, pipeline);
+
+ JobExecutionResult result;
+ try {
+ LOG.info("Starting execution of Flink program.");
+ result = env.executePipeline();
+ } catch (Exception e) {
+ LOG.error("Pipeline execution failed", e);
+ throw new RuntimeException("Pipeline execution failed", e);
+ }
+
+ if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) {
+ LOG.info("Pipeline submitted in Detached mode");
+ return new FlinkDetachedRunnerResult();
+ } else {
+ LOG.info("Execution finished in {} msecs", result.getNetRuntime());
+ Map<String, Object> accumulators = result.getAllAccumulatorResults();
+ if (accumulators != null && !accumulators.isEmpty()) {
+ LOG.info("Final aggregator values:");
+
+ for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) {
+ LOG.info("{} : {}", entry.getKey(), entry.getValue());
+ }
+ }
+
+ return new FlinkRunnerResult(accumulators, result.getNetRuntime());
+ }
+ }
+
+ /**
+ * For testing.
+ */
+ public FlinkPipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ @Override
+ public String toString() {
+ return "FlinkRunner#" + hashCode();
+ }
+
+ /**
+ * Attempts to detect all the resources the class loader has access to. This does not recurse
+ * to class loader parents stopping it from pulling in resources from the system class loader.
+ *
+ * @param classLoader The URLClassLoader to use to detect resources to stage.
+ * @return A list of absolute paths to the resources the class loader uses.
+ * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one
+ * of the resources the class loader exposes is not a file resource.
+ */
+ protected static List<String> detectClassPathResourcesToStage(
+ ClassLoader classLoader) {
+ if (!(classLoader instanceof URLClassLoader)) {
+ String message = String.format("Unable to use ClassLoader to detect classpath elements. "
+ + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
+ LOG.error(message);
+ throw new IllegalArgumentException(message);
+ }
+
+ List<String> files = new ArrayList<>();
+ for (URL url : ((URLClassLoader) classLoader).getURLs()) {
+ try {
+ files.add(new File(url.toURI()).getAbsolutePath());
+ } catch (IllegalArgumentException | URISyntaxException e) {
+ String message = String.format("Unable to convert url (%s) to file.", url);
+ LOG.error(message);
+ throw new IllegalArgumentException(message, e);
+ }
+ }
+ return files;
+ }
+
+ /** A set of {@link View}s with non-deterministic key coders. */
+ Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders;
+
+ /**
+ * Records that the {@link PTransform} requires a deterministic key coder.
+ */
+ void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
+ ptransformViewsWithNonDeterministicKeyCoders.add(ptransform);
+ }
+
+ /** Outputs a warning about PCollection views without deterministic key coders. */
+ private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pipeline) {
+ // We need to wait till this point to determine the names of the transforms since only
+ // at this time do we know the hierarchy of the transforms otherwise we could
+ // have just recorded the full names during apply time.
+ if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
+ final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>();
+ pipeline.traverseTopologically(new Pipeline.PipelineVisitor() {
+ @Override
+ public void visitValue(PValue value, TransformHierarchy.Node producer) {
+ }
+
+ @Override
+ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
+ ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+ }
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+ if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
+ ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+ }
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformHierarchy.Node node) {
+ }
+ });
+
+ LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} "
+ + "because the key coder is not deterministic. Falling back to singleton implementation "
+ + "which may cause memory and/or performance problems. Future major versions of "
+ + "the Flink runner will require deterministic key coders.",
+ ptransformViewNamesWithNonDeterministicKeyCoders);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
new file mode 100644
index 0000000..681459a
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+
+/**
+ * AutoService registrar - will register FlinkRunner and FlinkOptions
+ * as possible pipeline runner services.
+ *
+ * <p>It ends up in META-INF/services and gets picked up by Beam.
+ *
+ */
+public class FlinkRunnerRegistrar {
+ private FlinkRunnerRegistrar() { }
+
+ /**
+ * Pipeline runner registrar.
+ */
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static class Runner implements PipelineRunnerRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+ return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
+ FlinkRunner.class,
+ TestFlinkRunner.class);
+ }
+ }
+
+ /**
+ * Pipeline options registrar.
+ */
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static class Options implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>of(FlinkPipelineOptions.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
new file mode 100644
index 0000000..0682b56
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.joda.time.Duration;
+
+/**
+ * Result of executing a {@link org.apache.beam.sdk.Pipeline} with Flink. This
+ * has methods to query to job runtime and the final values of
+ * {@link org.apache.beam.sdk.transforms.Aggregator}s.
+ */
+public class FlinkRunnerResult implements PipelineResult {
+
+ private final Map<String, Object> aggregators;
+
+ private final long runtime;
+
+ FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
+ this.aggregators = (aggregators == null || aggregators.isEmpty())
+ ? Collections.<String, Object>emptyMap()
+ : Collections.unmodifiableMap(aggregators);
+ this.runtime = runtime;
+ }
+
+ @Override
+ public State getState() {
+ return State.DONE;
+ }
+
+ @Override
+ public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
+ throws AggregatorRetrievalException {
+ // TODO provide a list of all accumulator step values
+ Object value = aggregators.get(aggregator.getName());
+ if (value != null) {
+ return new AggregatorValues<T>() {
+ @Override
+ public Map<String, T> getValuesAtSteps() {
+ return (Map<String, T>) aggregators;
+ }
+ };
+ } else {
+ throw new AggregatorRetrievalException("Accumulator results not found.",
+ new RuntimeException("Accumulator does not exist."));
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "FlinkRunnerResult{"
+ + "aggregators=" + aggregators
+ + ", runtime=" + runtime
+ + '}';
+ }
+
+ @Override
+ public State cancel() throws IOException {
+ throw new UnsupportedOperationException("FlinkRunnerResult does not support cancel.");
+ }
+
+ @Override
+ public State waitUntilFinish() {
+ return State.DONE;
+ }
+
+ @Override
+ public State waitUntilFinish(Duration duration) {
+ return State.DONE;
+ }
+
+ @Override
+ public MetricResults metrics() {
+ throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
new file mode 100644
index 0000000..0459ef7
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformReplacements;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate
+ * the user-provided {@link org.apache.beam.sdk.values.PCollection}-based job into a
+ * {@link org.apache.flink.streaming.api.datastream.DataStream} one.
+ *
+ */
+class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class);
+
+ /** The necessary context in the case of a straming job. */
+ private final FlinkStreamingTranslationContext streamingContext;
+
+ private int depth = 0;
+
+ private FlinkRunner flinkRunner;
+
+ public FlinkStreamingPipelineTranslator(
+ FlinkRunner flinkRunner,
+ StreamExecutionEnvironment env,
+ PipelineOptions options) {
+ this.streamingContext = new FlinkStreamingTranslationContext(env, options);
+ this.flinkRunner = flinkRunner;
+ }
+
+ @Override
+ public void translate(Pipeline pipeline) {
+ List<PTransformOverride> transformOverrides =
+ ImmutableList.<PTransformOverride>builder()
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.splittableParDoMulti(),
+ new SplittableParDoOverrideFactory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsIterable.class),
+ new ReflectiveOneToOneOverrideFactory(
+ FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner)))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsList.class),
+ new ReflectiveOneToOneOverrideFactory(
+ FlinkStreamingViewOverrides.StreamingViewAsList.class, flinkRunner)))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsMap.class),
+ new ReflectiveOneToOneOverrideFactory(
+ FlinkStreamingViewOverrides.StreamingViewAsMap.class, flinkRunner)))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsMultimap.class),
+ new ReflectiveOneToOneOverrideFactory(
+ FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, flinkRunner)))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsSingleton.class),
+ new ReflectiveOneToOneOverrideFactory(
+ FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, flinkRunner)))
+ // this has to be last since the ViewAsSingleton override
+ // can expand to a Combine.GloballyAsSingletonView
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+ new ReflectiveOneToOneOverrideFactory(
+ FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class,
+ flinkRunner)))
+ .build();
+
+ pipeline.replaceAll(transformOverrides);
+ super.translate(pipeline);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Pipeline Visitor Methods
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+ LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
+ this.depth++;
+
+ PTransform<?, ?> transform = node.getTransform();
+ if (transform != null) {
+ StreamTransformTranslator<?> translator =
+ FlinkStreamingTransformTranslators.getTranslator(transform);
+
+ if (translator != null && applyCanTranslate(transform, node, translator)) {
+ applyStreamingTransform(transform, node, translator);
+ LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName());
+ return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+ }
+ }
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformHierarchy.Node node) {
+ this.depth--;
+ LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
+ }
+
+ @Override
+ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName());
+ // get the transformation corresponding to hte node we are
+ // currently visiting and translate it into its Flink alternative.
+
+ PTransform<?, ?> transform = node.getTransform();
+ StreamTransformTranslator<?> translator =
+ FlinkStreamingTransformTranslators.getTranslator(transform);
+
+ if (translator == null || !applyCanTranslate(transform, node, translator)) {
+ LOG.info(node.getTransform().getClass().toString());
+ throw new UnsupportedOperationException(
+ "The transform " + transform + " is currently not supported.");
+ }
+ applyStreamingTransform(transform, node, translator);
+ }
+
+ @Override
+ public void visitValue(PValue value, TransformHierarchy.Node producer) {
+ // do nothing here
+ }
+
+ private <T extends PTransform<?, ?>> void applyStreamingTransform(
+ PTransform<?, ?> transform,
+ TransformHierarchy.Node node,
+ StreamTransformTranslator<?> translator) {
+
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) transform;
+
+ @SuppressWarnings("unchecked")
+ StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator;
+
+ // create the applied PTransform on the streamingContext
+ streamingContext.setCurrentTransform(node.toAppliedPTransform());
+ typedTranslator.translateNode(typedTransform, streamingContext);
+ }
+
+ private <T extends PTransform<?, ?>> boolean applyCanTranslate(
+ PTransform<?, ?> transform,
+ TransformHierarchy.Node node,
+ StreamTransformTranslator<?> translator) {
+
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) transform;
+
+ @SuppressWarnings("unchecked")
+ StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator;
+
+ streamingContext.setCurrentTransform(node.toAppliedPTransform());
+
+ return typedTranslator.canTranslate(typedTransform, streamingContext);
+ }
+
+ /**
+ * The interface that every Flink translator of a Beam operator should implement.
+ * This interface is for <b>streaming</b> jobs. For examples of such translators see
+ * {@link FlinkStreamingTransformTranslators}.
+ */
+ abstract static class StreamTransformTranslator<T extends PTransform> {
+
+ /**
+ * Translate the given transform.
+ */
+ abstract void translateNode(T transform, FlinkStreamingTranslationContext context);
+
+ /**
+ * Returns true iff this translator can translate the given transform.
+ */
+ boolean canTranslate(T transform, FlinkStreamingTranslationContext context) {
+ return true;
+ }
+ }
+
+ private static class ReflectiveOneToOneOverrideFactory<
+ InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
+ extends SingleInputOutputOverrideFactory<
+ PCollection<InputT>, PCollection<OutputT>, TransformT> {
+ private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement;
+ private final FlinkRunner runner;
+
+ private ReflectiveOneToOneOverrideFactory(
+ Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement,
+ FlinkRunner runner) {
+ this.replacement = replacement;
+ this.runner = runner;
+ }
+
+ @Override
+ public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform(
+ AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) {
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(transform),
+ InstanceBuilder.ofType(replacement)
+ .withArg(FlinkRunner.class, runner)
+ .withArg(
+ (Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>)
+ transform.getTransform().getClass(),
+ transform.getTransform())
+ .build());
+ }
+ }
+
+ /**
+ * A {@link PTransformOverrideFactory} that overrides a <a
+ * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with {@link SplittableParDo}.
+ */
+ static class SplittableParDoOverrideFactory<InputT, OutputT>
+ implements PTransformOverrideFactory<
+ PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> {
+ @Override
+ public PTransformReplacement<PCollection<InputT>, PCollectionTuple>
+ getReplacementTransform(
+ AppliedPTransform<
+ PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
+ transform) {
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(transform),
+ new SplittableParDo<>(transform.getTransform()));
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) {
+ return ReplacementOutputs.tagged(outputs, newOutput);
+ }
+ }
+}