You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:26 UTC
[31/50] [abbrv] incubator-beam git commit: [flink] adjust directories
according to package name
[flink] adjust directories according to package name
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51bec310
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51bec310
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51bec310
Branch: refs/heads/master
Commit: 51bec310d3a68cd87071aff1b59d8353fc2c40ac
Parents: 028a55f
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Mar 2 23:15:22 2016 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800
----------------------------------------------------------------------
.../FlinkPipelineExecutionEnvironment.java | 267 -------
.../flink/dataflow/FlinkPipelineOptions.java | 91 ---
.../flink/dataflow/FlinkPipelineRunner.java | 204 ------
.../flink/dataflow/FlinkRunnerResult.java | 66 --
.../flink/dataflow/examples/TFIDF.java | 452 ------------
.../flink/dataflow/examples/WordCount.java | 111 ---
.../examples/streaming/AutoComplete.java | 387 ----------
.../examples/streaming/JoinExamples.java | 158 ----
.../KafkaWindowedWordCountExample.java | 141 ----
.../examples/streaming/WindowedWordCount.java | 128 ----
.../flink/dataflow/io/ConsoleIO.java | 80 ---
.../FlinkBatchPipelineTranslator.java | 149 ----
.../FlinkBatchTransformTranslators.java | 594 ---------------
.../FlinkBatchTranslationContext.java | 129 ----
.../translation/FlinkPipelineTranslator.java | 34 -
.../FlinkStreamingPipelineTranslator.java | 143 ----
.../FlinkStreamingTransformTranslators.java | 404 -----------
.../FlinkStreamingTranslationContext.java | 87 ---
.../FlinkCoGroupKeyedListAggregator.java | 58 --
.../functions/FlinkCreateFunction.java | 60 --
.../functions/FlinkDoFnFunction.java | 202 ------
.../FlinkKeyedListAggregationFunction.java | 75 --
.../functions/FlinkMultiOutputDoFnFunction.java | 175 -----
.../FlinkMultiOutputPruningFunction.java | 41 --
.../functions/FlinkPartialReduceFunction.java | 60 --
.../functions/FlinkReduceFunction.java | 57 --
.../translation/functions/UnionCoder.java | 150 ----
.../translation/types/CoderComparator.java | 216 ------
.../translation/types/CoderTypeInformation.java | 116 ---
.../translation/types/CoderTypeSerializer.java | 152 ----
.../types/InspectableByteArrayOutputStream.java | 34 -
.../translation/types/KvCoderComperator.java | 264 -------
.../types/KvCoderTypeInformation.java | 186 -----
.../types/VoidCoderTypeSerializer.java | 112 ---
.../wrappers/CombineFnAggregatorWrapper.java | 92 ---
.../wrappers/DataInputViewWrapper.java | 59 --
.../wrappers/DataOutputViewWrapper.java | 52 --
.../SerializableFnAggregatorWrapper.java | 91 ---
.../translation/wrappers/SinkOutputFormat.java | 121 ----
.../translation/wrappers/SourceInputFormat.java | 164 -----
.../translation/wrappers/SourceInputSplit.java | 52 --
.../streaming/FlinkAbstractParDoWrapper.java | 264 -------
.../FlinkGroupAlsoByWindowWrapper.java | 629 ----------------
.../streaming/FlinkGroupByKeyWrapper.java | 64 --
.../streaming/FlinkParDoBoundMultiWrapper.java | 75 --
.../streaming/FlinkParDoBoundWrapper.java | 98 ---
.../io/FlinkStreamingCreateFunction.java | 63 --
.../streaming/io/UnboundedFlinkSource.java | 80 ---
.../streaming/io/UnboundedSocketSource.java | 231 ------
.../streaming/io/UnboundedSourceWrapper.java | 132 ----
.../state/AbstractFlinkTimerInternals.java | 126 ----
.../streaming/state/FlinkStateInternals.java | 713 -------------------
.../streaming/state/StateCheckpointReader.java | 89 ---
.../streaming/state/StateCheckpointUtils.java | 153 ----
.../streaming/state/StateCheckpointWriter.java | 127 ----
.../wrappers/streaming/state/StateType.java | 71 --
.../FlinkPipelineExecutionEnvironment.java | 267 +++++++
.../runners/flink/FlinkPipelineOptions.java | 91 +++
.../beam/runners/flink/FlinkPipelineRunner.java | 204 ++++++
.../beam/runners/flink/FlinkRunnerResult.java | 66 ++
.../beam/runners/flink/examples/TFIDF.java | 452 ++++++++++++
.../beam/runners/flink/examples/WordCount.java | 111 +++
.../flink/examples/streaming/AutoComplete.java | 387 ++++++++++
.../flink/examples/streaming/JoinExamples.java | 158 ++++
.../KafkaWindowedWordCountExample.java | 141 ++++
.../examples/streaming/WindowedWordCount.java | 128 ++++
.../apache/beam/runners/flink/io/ConsoleIO.java | 80 +++
.../FlinkBatchPipelineTranslator.java | 149 ++++
.../FlinkBatchTransformTranslators.java | 594 +++++++++++++++
.../FlinkBatchTranslationContext.java | 129 ++++
.../translation/FlinkPipelineTranslator.java | 34 +
.../FlinkStreamingPipelineTranslator.java | 143 ++++
.../FlinkStreamingTransformTranslators.java | 404 +++++++++++
.../FlinkStreamingTranslationContext.java | 87 +++
.../FlinkCoGroupKeyedListAggregator.java | 58 ++
.../functions/FlinkCreateFunction.java | 60 ++
.../functions/FlinkDoFnFunction.java | 202 ++++++
.../FlinkKeyedListAggregationFunction.java | 75 ++
.../functions/FlinkMultiOutputDoFnFunction.java | 175 +++++
.../FlinkMultiOutputPruningFunction.java | 41 ++
.../functions/FlinkPartialReduceFunction.java | 60 ++
.../functions/FlinkReduceFunction.java | 57 ++
.../flink/translation/functions/UnionCoder.java | 150 ++++
.../translation/types/CoderComparator.java | 216 ++++++
.../translation/types/CoderTypeInformation.java | 116 +++
.../translation/types/CoderTypeSerializer.java | 152 ++++
.../types/InspectableByteArrayOutputStream.java | 34 +
.../translation/types/KvCoderComperator.java | 264 +++++++
.../types/KvCoderTypeInformation.java | 186 +++++
.../types/VoidCoderTypeSerializer.java | 112 +++
.../wrappers/CombineFnAggregatorWrapper.java | 92 +++
.../wrappers/DataInputViewWrapper.java | 59 ++
.../wrappers/DataOutputViewWrapper.java | 52 ++
.../SerializableFnAggregatorWrapper.java | 91 +++
.../translation/wrappers/SinkOutputFormat.java | 121 ++++
.../translation/wrappers/SourceInputFormat.java | 164 +++++
.../translation/wrappers/SourceInputSplit.java | 52 ++
.../streaming/FlinkAbstractParDoWrapper.java | 264 +++++++
.../FlinkGroupAlsoByWindowWrapper.java | 629 ++++++++++++++++
.../streaming/FlinkGroupByKeyWrapper.java | 64 ++
.../streaming/FlinkParDoBoundMultiWrapper.java | 75 ++
.../streaming/FlinkParDoBoundWrapper.java | 98 +++
.../io/FlinkStreamingCreateFunction.java | 63 ++
.../streaming/io/UnboundedFlinkSource.java | 80 +++
.../streaming/io/UnboundedSocketSource.java | 231 ++++++
.../streaming/io/UnboundedSourceWrapper.java | 132 ++++
.../state/AbstractFlinkTimerInternals.java | 126 ++++
.../streaming/state/FlinkStateInternals.java | 713 +++++++++++++++++++
.../streaming/state/StateCheckpointReader.java | 89 +++
.../streaming/state/StateCheckpointUtils.java | 153 ++++
.../streaming/state/StateCheckpointWriter.java | 127 ++++
.../wrappers/streaming/state/StateType.java | 71 ++
.../dataartisans/flink/dataflow/AvroITCase.java | 99 ---
.../flink/dataflow/FlattenizeITCase.java | 72 --
.../flink/dataflow/FlinkTestPipeline.java | 70 --
.../flink/dataflow/JoinExamplesITCase.java | 99 ---
.../flink/dataflow/MaybeEmptyTestITCase.java | 63 --
.../flink/dataflow/ParDoMultiOutputITCase.java | 98 ---
.../flink/dataflow/ReadSourceITCase.java | 163 -----
.../dataflow/RemoveDuplicatesEmptyITCase.java | 68 --
.../flink/dataflow/RemoveDuplicatesITCase.java | 69 --
.../flink/dataflow/SideInputITCase.java | 67 --
.../flink/dataflow/TfIdfITCase.java | 76 --
.../flink/dataflow/WordCountITCase.java | 74 --
.../flink/dataflow/WordCountJoin2ITCase.java | 136 ----
.../flink/dataflow/WordCountJoin3ITCase.java | 154 ----
.../flink/dataflow/WriteSinkITCase.java | 156 ----
.../streaming/GroupAlsoByWindowTest.java | 506 -------------
.../dataflow/streaming/GroupByNullKeyTest.java | 121 ----
.../streaming/StateSerializationTest.java | 303 --------
.../streaming/TopWikipediaSessionsITCase.java | 132 ----
.../flink/dataflow/util/JoinExamples.java | 158 ----
.../apache/beam/runners/flink/AvroITCase.java | 99 +++
.../beam/runners/flink/FlattenizeITCase.java | 72 ++
.../beam/runners/flink/FlinkTestPipeline.java | 70 ++
.../beam/runners/flink/JoinExamplesITCase.java | 99 +++
.../runners/flink/MaybeEmptyTestITCase.java | 63 ++
.../runners/flink/ParDoMultiOutputITCase.java | 98 +++
.../beam/runners/flink/ReadSourceITCase.java | 163 +++++
.../flink/RemoveDuplicatesEmptyITCase.java | 68 ++
.../runners/flink/RemoveDuplicatesITCase.java | 69 ++
.../beam/runners/flink/SideInputITCase.java | 67 ++
.../apache/beam/runners/flink/TfIdfITCase.java | 76 ++
.../beam/runners/flink/WordCountITCase.java | 74 ++
.../runners/flink/WordCountJoin2ITCase.java | 136 ++++
.../runners/flink/WordCountJoin3ITCase.java | 154 ++++
.../beam/runners/flink/WriteSinkITCase.java | 156 ++++
.../flink/streaming/GroupAlsoByWindowTest.java | 506 +++++++++++++
.../flink/streaming/GroupByNullKeyTest.java | 121 ++++
.../flink/streaming/StateSerializationTest.java | 303 ++++++++
.../streaming/TopWikipediaSessionsITCase.java | 132 ++++
.../beam/runners/flink/util/JoinExamples.java | 158 ++++
152 files changed, 11813 insertions(+), 11813 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
deleted file mode 100644
index 02a49b9..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.runners.flink.translation.FlinkPipelineTranslator;
-import org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator;
-import org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.CollectionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * The class that instantiates and manages the execution of a given job.
- * Depending on if the job is a Streaming or Batch processing one, it creates
- * the adequate execution environment ({@link ExecutionEnvironment} or {@link StreamExecutionEnvironment}),
- * the necessary {@link FlinkPipelineTranslator} ({@link FlinkBatchPipelineTranslator} or
- * {@link FlinkStreamingPipelineTranslator})to transform the Beam job into a Flink one, and
- * executes the (translated) job.
- */
-public class FlinkPipelineExecutionEnvironment {
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
-
- private final FlinkPipelineOptions options;
-
- /**
- * The Flink Batch execution environment. This is instantiated to either a
- * {@link org.apache.flink.api.java.CollectionEnvironment},
- * a {@link org.apache.flink.api.java.LocalEnvironment} or
- * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration
- * options.
- */
- private ExecutionEnvironment flinkBatchEnv;
-
-
- /**
- * The Flink Streaming execution environment. This is instantiated to either a
- * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
- * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
- * on the configuration options, and more specifically, the url of the master.
- */
- private StreamExecutionEnvironment flinkStreamEnv;
-
- /**
- * Translator for this FlinkPipelineRunner. Its role is to translate the Beam operators to
- * their Flink counterparts. Based on the options provided by the user, if we have a streaming job,
- * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. In other case, i.e. a batch job,
- * a {@link FlinkBatchPipelineTranslator} is created.
- */
- private FlinkPipelineTranslator flinkPipelineTranslator;
-
- /**
- * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the
- * provided {@link FlinkPipelineOptions}.
- *
- * @param options the user-defined pipeline options.
- * */
- public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
- this.options = Preconditions.checkNotNull(options);
- this.createPipelineExecutionEnvironment();
- this.createPipelineTranslator();
- }
-
- /**
- * Depending on the type of job (Streaming or Batch) and the user-specified options,
- * this method creates the adequate ExecutionEnvironment.
- */
- private void createPipelineExecutionEnvironment() {
- if (options.isStreaming()) {
- createStreamExecutionEnvironment();
- } else {
- createBatchExecutionEnvironment();
- }
- }
-
- /**
- * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph
- * translator. In the case of batch, it will work with {@link org.apache.flink.api.java.DataSet},
- * while for streaming, it will work with {@link org.apache.flink.streaming.api.datastream.DataStream}.
- */
- private void createPipelineTranslator() {
- checkInitializationState();
- if (this.flinkPipelineTranslator != null) {
- throw new IllegalStateException("FlinkPipelineTranslator already initialized.");
- }
-
- this.flinkPipelineTranslator = options.isStreaming() ?
- new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
- new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
- }
-
- /**
- * Depending on if the job is a Streaming or a Batch one, this method creates
- * the necessary execution environment and pipeline translator, and translates
- * the {@link com.google.cloud.dataflow.sdk.values.PCollection} program into
- * a {@link org.apache.flink.api.java.DataSet} or {@link org.apache.flink.streaming.api.datastream.DataStream}
- * one.
- * */
- public void translate(Pipeline pipeline) {
- checkInitializationState();
- if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
- createPipelineExecutionEnvironment();
- }
- if (this.flinkPipelineTranslator == null) {
- createPipelineTranslator();
- }
- this.flinkPipelineTranslator.translate(pipeline);
- }
-
- /**
- * Launches the program execution.
- * */
- public JobExecutionResult executePipeline() throws Exception {
- if (options.isStreaming()) {
- if (this.flinkStreamEnv == null) {
- throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
- }
- if (this.flinkPipelineTranslator == null) {
- throw new RuntimeException("FlinkPipelineTranslator not initialized.");
- }
- return this.flinkStreamEnv.execute();
- } else {
- if (this.flinkBatchEnv == null) {
- throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
- }
- if (this.flinkPipelineTranslator == null) {
- throw new RuntimeException("FlinkPipelineTranslator not initialized.");
- }
- return this.flinkBatchEnv.execute();
- }
- }
-
- /**
- * If the submitted job is a batch processing job, this method creates the adequate
- * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
- * on the user-specified options.
- */
- private void createBatchExecutionEnvironment() {
- if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
- throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
- }
-
- LOG.info("Creating the required Batch Execution Environment.");
-
- String masterUrl = options.getFlinkMaster();
- this.flinkStreamEnv = null;
-
- // depending on the master, create the right environment.
- if (masterUrl.equals("[local]")) {
- this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
- } else if (masterUrl.equals("[collection]")) {
- this.flinkBatchEnv = new CollectionEnvironment();
- } else if (masterUrl.equals("[auto]")) {
- this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
- } else if (masterUrl.matches(".*:\\d*")) {
- String[] parts = masterUrl.split(":");
- List<String> stagingFiles = options.getFilesToStage();
- this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
- Integer.parseInt(parts[1]),
- stagingFiles.toArray(new String[stagingFiles.size()]));
- } else {
- LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
- this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
- }
-
- // set the correct parallelism.
- if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) {
- this.flinkBatchEnv.setParallelism(options.getParallelism());
- }
-
- // set parallelism in the options (required by some execution code)
- options.setParallelism(flinkBatchEnv.getParallelism());
- }
-
- /**
- * If the submitted job is a stream processing job, this method creates the adequate
- * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
- * on the user-specified options.
- */
- private void createStreamExecutionEnvironment() {
- if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
- throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
- }
-
- LOG.info("Creating the required Streaming Environment.");
-
- String masterUrl = options.getFlinkMaster();
- this.flinkBatchEnv = null;
-
- // depending on the master, create the right environment.
- if (masterUrl.equals("[local]")) {
- this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
- } else if (masterUrl.equals("[auto]")) {
- this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- } else if (masterUrl.matches(".*:\\d*")) {
- String[] parts = masterUrl.split(":");
- List<String> stagingFiles = options.getFilesToStage();
- this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
- Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()]));
- } else {
- LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
- this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- }
-
- // set the correct parallelism.
- if (options.getParallelism() != -1) {
- this.flinkStreamEnv.setParallelism(options.getParallelism());
- }
-
- // set parallelism in the options (required by some execution code)
- options.setParallelism(flinkStreamEnv.getParallelism());
-
- // default to event time
- this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- // for the following 2 parameters, a value of -1 means that Flink will use
- // the default values as specified in the configuration.
- int numRetries = options.getNumberOfExecutionRetries();
- if (numRetries != -1) {
- this.flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
- }
- long retryDelay = options.getExecutionRetryDelay();
- if (retryDelay != -1) {
- this.flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
- }
-
- // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink).
- // If the value is not -1, then the validity checks are applied.
- // By default, checkpointing is disabled.
- long checkpointInterval = options.getCheckpointingInterval();
- if(checkpointInterval != -1) {
- if (checkpointInterval < 1) {
- throw new IllegalArgumentException("The checkpoint interval must be positive");
- }
- this.flinkStreamEnv.enableCheckpointing(checkpointInterval);
- }
- }
-
- private void checkInitializationState() {
- if (options.isStreaming() && this.flinkBatchEnv != null) {
- throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment.");
- } else if (!options.isStreaming() && this.flinkStreamEnv != null) {
- throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
deleted file mode 100644
index bf83353..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.StreamingOptions;
-
-import java.util.List;
-
-/**
- * Options which can be used to configure a Flink PipelineRunner.
- */
-public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
-
- /**
- * List of local files to make available to workers.
- * <p>
- * Jars are placed on the worker's classpath.
- * <p>
- * The default value is the list of jars from the main program's classpath.
- */
- @Description("Jar-Files to send to all workers and put on the classpath. " +
- "The default value is all files from the classpath.")
- @JsonIgnore
- List<String> getFilesToStage();
- void setFilesToStage(List<String> value);
-
- /**
- * The job name is used to identify jobs running on a Flink cluster.
- */
- @Description("Dataflow job name, to uniquely identify active jobs. "
- + "Defaults to using the ApplicationName-UserName-Date.")
- @Default.InstanceFactory(DataflowPipelineOptions.JobNameFactory.class)
- String getJobName();
- void setJobName(String value);
-
- /**
- * The url of the Flink JobManager on which to execute pipelines. This can either be
- * the the address of a cluster JobManager, in the form "host:port" or one of the special
- * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink
- * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while
- * "[auto]" will let the system decide where to execute the pipeline based on the environment.
- */
- @Description("Address of the Flink Master where the Pipeline should be executed. Can" +
- " either be of the form \"host:port\" or one of the special values [local], " +
- "[collection] or [auto].")
- String getFlinkMaster();
- void setFlinkMaster(String value);
-
- @Description("The degree of parallelism to be used when distributing operations onto workers.")
- @Default.Integer(-1)
- Integer getParallelism();
- void setParallelism(Integer value);
-
- @Description("The interval between consecutive checkpoints (i.e. snapshots of the current pipeline state used for " +
- "fault tolerance).")
- @Default.Long(-1L)
- Long getCheckpointingInterval();
- void setCheckpointingInterval(Long interval);
-
- @Description("Sets the number of times that failed tasks are re-executed. " +
- "A value of zero effectively disables fault tolerance. A value of -1 indicates " +
- "that the system default value (as defined in the configuration) should be used.")
- @Default.Integer(-1)
- Integer getNumberOfExecutionRetries();
- void setNumberOfExecutionRetries(Integer retries);
-
- @Description("Sets the delay between executions. A value of {@code -1} indicates that the default value should be used.")
- @Default.Long(-1L)
- Long getExecutionRetryDelay();
- void setExecutionRetryDelay(Long delay);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
deleted file mode 100644
index 3c33d20..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A {@link PipelineRunner} that executes the operations in the
- * pipeline by first translating them to a Flink Plan and then executing them either locally
- * or on a Flink cluster, depending on the configuration.
- * <p>
- * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}.
- */
-public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class);
-
- /**
- * Provided options.
- */
- private final FlinkPipelineOptions options;
-
- private final FlinkPipelineExecutionEnvironment flinkJobEnv;
-
- /**
- * Construct a runner from the provided options.
- *
- * @param options Properties which configure the runner.
- * @return The newly created runner.
- */
- public static FlinkPipelineRunner fromOptions(PipelineOptions options) {
- FlinkPipelineOptions flinkOptions =
- PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
- ArrayList<String> missing = new ArrayList<>();
-
- if (flinkOptions.getAppName() == null) {
- missing.add("appName");
- }
- if (missing.size() > 0) {
- throw new IllegalArgumentException(
- "Missing required values: " + Joiner.on(',').join(missing));
- }
-
- if (flinkOptions.getFilesToStage() == null) {
- flinkOptions.setFilesToStage(detectClassPathResourcesToStage(
- DataflowPipelineRunner.class.getClassLoader()));
- LOG.info("PipelineOptions.filesToStage was not specified. "
- + "Defaulting to files from the classpath: will stage {} files. "
- + "Enable logging at DEBUG level to see which files will be staged.",
- flinkOptions.getFilesToStage().size());
- LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
- }
-
- // Verify jobName according to service requirements.
- String jobName = flinkOptions.getJobName().toLowerCase();
- Preconditions.checkArgument(jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), "JobName invalid; " +
- "the name must consist of only the characters " + "[-a-z0-9], starting with a letter " +
- "and ending with a letter " + "or number");
- Preconditions.checkArgument(jobName.length() <= 40,
- "JobName too long; must be no more than 40 characters in length");
-
- // Set Flink Master to [auto] if no option was specified.
- if (flinkOptions.getFlinkMaster() == null) {
- flinkOptions.setFlinkMaster("[auto]");
- }
-
- return new FlinkPipelineRunner(flinkOptions);
- }
-
- private FlinkPipelineRunner(FlinkPipelineOptions options) {
- this.options = options;
- this.flinkJobEnv = new FlinkPipelineExecutionEnvironment(options);
- }
-
- @Override
- public FlinkRunnerResult run(Pipeline pipeline) {
- LOG.info("Executing pipeline using FlinkPipelineRunner.");
-
- LOG.info("Translating pipeline to Flink program.");
-
- this.flinkJobEnv.translate(pipeline);
-
- LOG.info("Starting execution of Flink program.");
-
- JobExecutionResult result;
- try {
- result = this.flinkJobEnv.executePipeline();
- } catch (Exception e) {
- LOG.error("Pipeline execution failed", e);
- throw new RuntimeException("Pipeline execution failed", e);
- }
-
- LOG.info("Execution finished in {} msecs", result.getNetRuntime());
-
- Map<String, Object> accumulators = result.getAllAccumulatorResults();
- if (accumulators != null && !accumulators.isEmpty()) {
- LOG.info("Final aggregator values:");
-
- for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) {
- LOG.info("{} : {}", entry.getKey(), entry.getValue());
- }
- }
-
- return new FlinkRunnerResult(accumulators, result.getNetRuntime());
- }
-
- /**
- * For testing.
- */
- public FlinkPipelineOptions getPipelineOptions() {
- return options;
- }
-
- /**
- * Constructs a runner with default properties for testing.
- *
- * @return The newly created runner.
- */
- public static FlinkPipelineRunner createForTest(boolean streaming) {
- FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
- // we use [auto] for testing since this will make it pick up the Testing
- // ExecutionEnvironment
- options.setFlinkMaster("[auto]");
- options.setStreaming(streaming);
- return new FlinkPipelineRunner(options);
- }
-
- @Override
- public <Output extends POutput, Input extends PInput> Output apply(
- PTransform<Input, Output> transform, Input input) {
- return super.apply(transform, input);
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- @Override
- public String toString() {
- return "DataflowPipelineRunner#" + hashCode();
- }
-
- /**
- * Attempts to detect all the resources the class loader has access to. This does not recurse
- * to class loader parents stopping it from pulling in resources from the system class loader.
- *
- * @param classLoader The URLClassLoader to use to detect resources to stage.
- * @return A list of absolute paths to the resources the class loader uses.
- * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one
- * of the resources the class loader exposes is not a file resource.
- */
- protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
- if (!(classLoader instanceof URLClassLoader)) {
- String message = String.format("Unable to use ClassLoader to detect classpath elements. "
- + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
- LOG.error(message);
- throw new IllegalArgumentException(message);
- }
-
- List<String> files = new ArrayList<>();
- for (URL url : ((URLClassLoader) classLoader).getURLs()) {
- try {
- files.add(new File(url.toURI()).getAbsolutePath());
- } catch (IllegalArgumentException | URISyntaxException e) {
- String message = String.format("Unable to convert url (%s) to file.", url);
- LOG.error(message);
- throw new IllegalArgumentException(message, e);
- }
- }
- return files;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkRunnerResult.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkRunnerResult.java
deleted file mode 100644
index c2329a6..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkRunnerResult.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException;
-import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Result of executing a {@link com.google.cloud.dataflow.sdk.Pipeline} with Flink. This
- * has methods to query to job runtime and the final values of
- * {@link com.google.cloud.dataflow.sdk.transforms.Aggregator}s.
- */
-public class FlinkRunnerResult implements PipelineResult {
-
- private final Map<String, Object> aggregators;
-
- private final long runtime;
-
- public FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
- this.aggregators = (aggregators == null || aggregators.isEmpty()) ?
- Collections.<String, Object>emptyMap() :
- Collections.unmodifiableMap(aggregators);
-
- this.runtime = runtime;
- }
-
- @Override
- public State getState() {
- return null;
- }
-
- @Override
- public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) throws AggregatorRetrievalException {
- // TODO provide a list of all accumulator step values
- Object value = aggregators.get(aggregator.getName());
- if (value != null) {
- return new AggregatorValues<T>() {
- @Override
- public Map<String, T> getValuesAtSteps() {
- return (Map<String, T>) aggregators;
- }
- };
- } else {
- throw new AggregatorRetrievalException("Accumulator results not found.",
- new RuntimeException("Accumulator does not exist."));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/TFIDF.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/TFIDF.java
deleted file mode 100644
index ab23b92..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/TFIDF.java
+++ /dev/null
@@ -1,452 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.beam.runners.flink.examples;
-
-import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.GcsOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.Keys;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
-import com.google.cloud.dataflow.sdk.transforms.Values;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.WithKeys;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
-import com.google.cloud.dataflow.sdk.util.GcsUtil;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * An example that computes a basic TF-IDF search table for a directory or GCS prefix.
- *
- * <p> Concepts: joining data; side inputs; logging
- *
- * <p> To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * }</pre>
- * and a local output file or output prefix on GCS:
- * <pre>{@code
- * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
- * }</pre>
- *
- * <p> To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowPipelineRunner
- * and an output prefix on GCS:
- * --output=gs://YOUR_OUTPUT_PREFIX
- * }</pre>
- *
- * <p> The default input is {@code gs://dataflow-samples/shakespeare/} and can be overridden with
- * {@code --input}.
- */
-public class TFIDF {
- /**
- * Options supported by {@link TFIDF}.
- * <p>
- * Inherits standard configuration options.
- */
- private interface Options extends PipelineOptions, FlinkPipelineOptions {
- @Description("Path to the directory or GCS prefix containing files to read from")
- @Default.String("gs://dataflow-samples/shakespeare/")
- String getInput();
- void setInput(String value);
-
- @Description("Prefix of output URI to write to")
- @Validation.Required
- String getOutput();
- void setOutput(String value);
- }
-
- /**
- * Lists documents contained beneath the {@code options.input} prefix/directory.
- */
- public static Set<URI> listInputDocuments(Options options)
- throws URISyntaxException, IOException {
- URI baseUri = new URI(options.getInput());
-
- // List all documents in the directory or GCS prefix.
- URI absoluteUri;
- if (baseUri.getScheme() != null) {
- absoluteUri = baseUri;
- } else {
- absoluteUri = new URI(
- "file",
- baseUri.getAuthority(),
- baseUri.getPath(),
- baseUri.getQuery(),
- baseUri.getFragment());
- }
-
- Set<URI> uris = new HashSet<>();
- if (absoluteUri.getScheme().equals("file")) {
- File directory = new File(absoluteUri);
- for (String entry : directory.list()) {
- File path = new File(directory, entry);
- uris.add(path.toURI());
- }
- } else if (absoluteUri.getScheme().equals("gs")) {
- GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil();
- URI gcsUriGlob = new URI(
- absoluteUri.getScheme(),
- absoluteUri.getAuthority(),
- absoluteUri.getPath() + "*",
- absoluteUri.getQuery(),
- absoluteUri.getFragment());
- for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) {
- uris.add(entry.toUri());
- }
- }
-
- return uris;
- }
-
- /**
- * Reads the documents at the provided uris and returns all lines
- * from the documents tagged with which document they are from.
- */
- public static class ReadDocuments
- extends PTransform<PInput, PCollection<KV<URI, String>>> {
- private static final long serialVersionUID = 0;
-
- private Iterable<URI> uris;
-
- public ReadDocuments(Iterable<URI> uris) {
- this.uris = uris;
- }
-
- @Override
- public Coder<?> getDefaultOutputCoder() {
- return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of());
- }
-
- @Override
- public PCollection<KV<URI, String>> apply(PInput input) {
- Pipeline pipeline = input.getPipeline();
-
- // Create one TextIO.Read transform for each document
- // and add its output to a PCollectionList
- PCollectionList<KV<URI, String>> urisToLines =
- PCollectionList.empty(pipeline);
-
- // TextIO.Read supports:
- // - file: URIs and paths locally
- // - gs: URIs on the service
- for (final URI uri : uris) {
- String uriString;
- if (uri.getScheme().equals("file")) {
- uriString = new File(uri).getPath();
- } else {
- uriString = uri.toString();
- }
-
- PCollection<KV<URI, String>> oneUriToLines = pipeline
- .apply(TextIO.Read.from(uriString)
- .named("TextIO.Read(" + uriString + ")"))
- .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
-
- urisToLines = urisToLines.and(oneUriToLines);
- }
-
- return urisToLines.apply(Flatten.<KV<URI, String>>pCollections());
- }
- }
-
- /**
- * A transform containing a basic TF-IDF pipeline. The input consists of KV objects
- * where the key is the document's URI and the value is a piece
- * of the document's content. The output is mapping from terms to
- * scores for each document URI.
- */
- public static class ComputeTfIdf
- extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> {
- private static final long serialVersionUID = 0;
-
- public ComputeTfIdf() { }
-
- @Override
- public PCollection<KV<String, KV<URI, Double>>> apply(
- PCollection<KV<URI, String>> uriToContent) {
-
- // Compute the total number of documents, and
- // prepare this singleton PCollectionView for
- // use as a side input.
- final PCollectionView<Long> totalDocuments =
- uriToContent
- .apply("GetURIs", Keys.<URI>create())
- .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
- .apply(Count.<URI>globally())
- .apply(View.<Long>asSingleton());
-
- // Create a collection of pairs mapping a URI to each
- // of the words in the document associated with that that URI.
- PCollection<KV<URI, String>> uriToWords = uriToContent
- .apply(ParDo.named("SplitWords").of(
- new DoFn<KV<URI, String>, KV<URI, String>>() {
- private static final long serialVersionUID = 0;
-
- @Override
- public void processElement(ProcessContext c) {
- URI uri = c.element().getKey();
- String line = c.element().getValue();
- for (String word : line.split("\\W+")) {
- // Log INFO messages when the word “love” is found.
- if (word.toLowerCase().equals("love")) {
- LOG.info("Found {}", word.toLowerCase());
- }
-
- if (!word.isEmpty()) {
- c.output(KV.of(uri, word.toLowerCase()));
- }
- }
- }
- }));
-
- // Compute a mapping from each word to the total
- // number of documents in which it appears.
- PCollection<KV<String, Long>> wordToDocCount = uriToWords
- .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
- .apply(Values.<String>create())
- .apply("CountDocs", Count.<String>perElement());
-
- // Compute a mapping from each URI to the total
- // number of words in the document associated with that URI.
- PCollection<KV<URI, Long>> uriToWordTotal = uriToWords
- .apply("GetURIs2", Keys.<URI>create())
- .apply("CountWords", Count.<URI>perElement());
-
- // Count, for each (URI, word) pair, the number of
- // occurrences of that word in the document associated
- // with the URI.
- PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords
- .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement());
-
- // Adjust the above collection to a mapping from
- // (URI, word) pairs to counts into an isomorphic mapping
- // from URI to (word, count) pairs, to prepare for a join
- // by the URI key.
- PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
- .apply(ParDo.named("ShiftKeys").of(
- new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
- private static final long serialVersionUID = 0;
-
- @Override
- public void processElement(ProcessContext c) {
- URI uri = c.element().getKey().getKey();
- String word = c.element().getKey().getValue();
- Long occurrences = c.element().getValue();
- c.output(KV.of(uri, KV.of(word, occurrences)));
- }
- }));
-
- // Prepare to join the mapping of URI to (word, count) pairs with
- // the mapping of URI to total word counts, by associating
- // each of the input PCollection<KV<URI, ...>> with
- // a tuple tag. Each input must have the same key type, URI
- // in this case. The type parameter of the tuple tag matches
- // the types of the values for each collection.
- final TupleTag<Long> wordTotalsTag = new TupleTag<>();
- final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<>();
- KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple
- .of(wordTotalsTag, uriToWordTotal)
- .and(wordCountsTag, uriToWordAndCount);
-
- // Perform a CoGroupByKey (a sort of pre-join) on the prepared
- // inputs. This yields a mapping from URI to a CoGbkResult
- // (CoGroupByKey Result). The CoGbkResult is a mapping
- // from the above tuple tags to the values in each input
- // associated with a particular URI. In this case, each
- // KV<URI, CoGbkResult> group a URI with the total number of
- // words in that document as well as all the (word, count)
- // pairs for particular words.
- PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput
- .apply("CoGroupByUri", CoGroupByKey.<URI>create());
-
- // Compute a mapping from each word to a (URI, term frequency)
- // pair for each URI. A word's term frequency for a document
- // is simply the number of times that word occurs in the document
- // divided by the total number of words in the document.
- PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
- .apply(ParDo.named("ComputeTermFrequencies").of(
- new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
- private static final long serialVersionUID = 0;
-
- @Override
- public void processElement(ProcessContext c) {
- URI uri = c.element().getKey();
- Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
-
- for (KV<String, Long> wordAndCount
- : c.element().getValue().getAll(wordCountsTag)) {
- String word = wordAndCount.getKey();
- Long wordCount = wordAndCount.getValue();
- Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue();
- c.output(KV.of(word, KV.of(uri, termFrequency)));
- }
- }
- }));
-
- // Compute a mapping from each word to its document frequency.
- // A word's document frequency in a corpus is the number of
- // documents in which the word appears divided by the total
- // number of documents in the corpus. Note how the total number of
- // documents is passed as a side input; the same value is
- // presented to each invocation of the DoFn.
- PCollection<KV<String, Double>> wordToDf = wordToDocCount
- .apply(ParDo
- .named("ComputeDocFrequencies")
- .withSideInputs(totalDocuments)
- .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
- private static final long serialVersionUID = 0;
-
- @Override
- public void processElement(ProcessContext c) {
- String word = c.element().getKey();
- Long documentCount = c.element().getValue();
- Long documentTotal = c.sideInput(totalDocuments);
- Double documentFrequency = documentCount.doubleValue()
- / documentTotal.doubleValue();
-
- c.output(KV.of(word, documentFrequency));
- }
- }));
-
- // Join the term frequency and document frequency
- // collections, each keyed on the word.
- final TupleTag<KV<URI, Double>> tfTag = new TupleTag<>();
- final TupleTag<Double> dfTag = new TupleTag<>();
- PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple
- .of(tfTag, wordToUriAndTf)
- .and(dfTag, wordToDf)
- .apply(CoGroupByKey.<String>create());
-
- // Compute a mapping from each word to a (URI, TF-IDF) score
- // for each URI. There are a variety of definitions of TF-IDF
- // ("term frequency - inverse document frequency") score;
- // here we use a basic version that is the term frequency
- // divided by the log of the document frequency.
-
- return wordToUriAndTfAndDf
- .apply(ParDo.named("ComputeTfIdf").of(
- new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
- private static final long serialVersionUID1 = 0;
-
- @Override
- public void processElement(ProcessContext c) {
- String word = c.element().getKey();
- Double df = c.element().getValue().getOnly(dfTag);
-
- for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) {
- URI uri = uriAndTf.getKey();
- Double tf = uriAndTf.getValue();
- Double tfIdf = tf * Math.log(1 / df);
- c.output(KV.of(word, KV.of(uri, tfIdf)));
- }
- }
- }));
- }
-
- // Instantiate Logger.
- // It is suggested that the user specify the class name of the containing class
- // (in this case ComputeTfIdf).
- private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class);
- }
-
- /**
- * A {@link PTransform} to write, in CSV format, a mapping from term and URI
- * to score.
- */
- public static class WriteTfIdf
- extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> {
- private static final long serialVersionUID = 0;
-
- private String output;
-
- public WriteTfIdf(String output) {
- this.output = output;
- }
-
- @Override
- public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
- return wordToUriAndTfIdf
- .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() {
- private static final long serialVersionUID = 0;
-
- @Override
- public void processElement(ProcessContext c) {
- c.output(String.format("%s,\t%s,\t%f",
- c.element().getKey(),
- c.element().getValue().getKey(),
- c.element().getValue().getValue()));
- }
- }))
- .apply(TextIO.Write
- .to(output)
- .withSuffix(".csv"));
- }
- }
-
- public static void main(String[] args) throws Exception {
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-
- options.setRunner(FlinkPipelineRunner.class);
-
- Pipeline pipeline = Pipeline.create(options);
- pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
-
- pipeline
- .apply(new ReadDocuments(listInputDocuments(options)))
- .apply(new ComputeTfIdf())
- .apply(new WriteTfIdf(options.getOutput()));
-
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
deleted file mode 100644
index ba46301..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.examples;
-
-import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.*;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-public class WordCount {
-
- public static class ExtractWordsFn extends DoFn<String, String> {
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
-
- @Override
- public void processElement(ProcessContext c) {
- if (c.element().trim().isEmpty()) {
- emptyLines.addValue(1L);
- }
-
- // Split the line into words.
- String[] words = c.element().split("[^a-zA-Z']+");
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- public static class CountWords extends PTransform<PCollection<String>,
- PCollection<KV<String, Long>>> {
- @Override
- public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
-
- // Convert lines of text into individual words.
- PCollection<String> words = lines.apply(
- ParDo.of(new ExtractWordsFn()));
-
- // Count the number of times each word occurs.
- PCollection<KV<String, Long>> wordCounts =
- words.apply(Count.<String>perElement());
-
- return wordCounts;
- }
- }
-
- /** A SimpleFunction that converts a Word and Count into a printable string. */
- public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
- @Override
- public String apply(KV<String, Long> input) {
- return input.getKey() + ": " + input.getValue();
- }
- }
-
- /**
- * Options supported by {@link WordCount}.
- * <p>
- * Inherits standard configuration options.
- */
- public interface Options extends PipelineOptions, FlinkPipelineOptions {
- @Description("Path of the file to read from")
- @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
- String getInput();
- void setInput(String value);
-
- @Description("Path of the file to write to")
- String getOutput();
- void setOutput(String value);
- }
-
- public static void main(String[] args) {
-
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation()
- .as(Options.class);
- options.setRunner(FlinkPipelineRunner.class);
-
- Pipeline p = Pipeline.create(options);
-
- p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
- .apply(new CountWords())
- .apply(MapElements.via(new FormatAsTextFn()))
- .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
-
- p.run();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
deleted file mode 100644
index 8168122..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.beam.runners.flink.examples.streaming;
-
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.AvroCoder;
-import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
-import com.google.cloud.dataflow.sdk.io.*;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.*;
-import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.*;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import org.joda.time.Duration;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * To run the example, first open a socket on a terminal by executing the command:
- * <li>
- * <li>
- * <code>nc -lk 9999</code>
- * </li>
- * </li>
- * and then launch the example. Now whatever you type in the terminal is going to be
- * the input to the program.
- * */
-public class AutoComplete {
-
- /**
- * A PTransform that takes as input a list of tokens and returns
- * the most common tokens per prefix.
- */
- public static class ComputeTopCompletions
- extends PTransform<PCollection<String>, PCollection<KV<String, List<CompletionCandidate>>>> {
- private static final long serialVersionUID = 0;
-
- private final int candidatesPerPrefix;
- private final boolean recursive;
-
- protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) {
- this.candidatesPerPrefix = candidatesPerPrefix;
- this.recursive = recursive;
- }
-
- public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) {
- return new ComputeTopCompletions(candidatesPerPrefix, recursive);
- }
-
- @Override
- public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) {
- PCollection<CompletionCandidate> candidates = input
- // First count how often each token appears.
- .apply(new Count.PerElement<String>())
-
- // Map the KV outputs of Count into our own CompletionCandiate class.
- .apply(ParDo.named("CreateCompletionCandidates").of(
- new DoFn<KV<String, Long>, CompletionCandidate>() {
- private static final long serialVersionUID = 0;
-
- @Override
- public void processElement(ProcessContext c) {
- CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue());
- c.output(cand);
- }
- }));
-
- // Compute the top via either a flat or recursive algorithm.
- if (recursive) {
- return candidates
- .apply(new ComputeTopRecursive(candidatesPerPrefix, 1))
- .apply(Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
- } else {
- return candidates
- .apply(new ComputeTopFlat(candidatesPerPrefix, 1));
- }
- }
- }
-
- /**
- * Lower latency, but more expensive.
- */
- private static class ComputeTopFlat
- extends PTransform<PCollection<CompletionCandidate>,
- PCollection<KV<String, List<CompletionCandidate>>>> {
- private static final long serialVersionUID = 0;
-
- private final int candidatesPerPrefix;
- private final int minPrefix;
-
- public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) {
- this.candidatesPerPrefix = candidatesPerPrefix;
- this.minPrefix = minPrefix;
- }
-
- @Override
- public PCollection<KV<String, List<CompletionCandidate>>> apply(
- PCollection<CompletionCandidate> input) {
- return input
- // For each completion candidate, map it to all prefixes.
- .apply(ParDo.of(new AllPrefixes(minPrefix)))
-
- // Find and return the top candiates for each prefix.
- .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix)
- .withHotKeyFanout(new HotKeyFanout()));
- }
-
- private static class HotKeyFanout implements SerializableFunction<String, Integer> {
- private static final long serialVersionUID = 0;
-
- @Override
- public Integer apply(String input) {
- return (int) Math.pow(4, 5 - input.length());
- }
- }
- }
-
- /**
- * Cheaper but higher latency.
- *
- * <p> Returns two PCollections, the first is top prefixes of size greater
- * than minPrefix, and the second is top prefixes of size exactly
- * minPrefix.
- */
- private static class ComputeTopRecursive
- extends PTransform<PCollection<CompletionCandidate>,
- PCollectionList<KV<String, List<CompletionCandidate>>>> {
- private static final long serialVersionUID = 0;
-
- private final int candidatesPerPrefix;
- private final int minPrefix;
-
- public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) {
- this.candidatesPerPrefix = candidatesPerPrefix;
- this.minPrefix = minPrefix;
- }
-
- private class KeySizePartitionFn implements PartitionFn<KV<String, List<CompletionCandidate>>> {
- private static final long serialVersionUID = 0;
-
- @Override
- public int partitionFor(KV<String, List<CompletionCandidate>> elem, int numPartitions) {
- return elem.getKey().length() > minPrefix ? 0 : 1;
- }
- }
-
- private static class FlattenTops
- extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
- private static final long serialVersionUID = 0;
-
- @Override
- public void processElement(ProcessContext c) {
- for (CompletionCandidate cc : c.element().getValue()) {
- c.output(cc);
- }
- }
- }
-
- @Override
- public PCollectionList<KV<String, List<CompletionCandidate>>> apply(
- PCollection<CompletionCandidate> input) {
- if (minPrefix > 10) {
- // Base case, partitioning to return the output in the expected format.
- return input
- .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix))
- .apply(Partition.of(2, new KeySizePartitionFn()));
- } else {
- // If a candidate is in the top N for prefix a...b, it must also be in the top
- // N for a...bX for every X, which is typlically a much smaller set to consider.
- // First, compute the top candidate for prefixes of size at least minPrefix + 1.
- PCollectionList<KV<String, List<CompletionCandidate>>> larger = input
- .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 1));
- // Consider the top candidates for each prefix of length minPrefix + 1...
- PCollection<KV<String, List<CompletionCandidate>>> small =
- PCollectionList
- .of(larger.get(1).apply(ParDo.of(new FlattenTops())))
- // ...together with those (previously excluded) candidates of length
- // exactly minPrefix...
- .and(input.apply(Filter.by(new SerializableFunction<CompletionCandidate, Boolean>() {
- private static final long serialVersionUID = 0;
-
- @Override
- public Boolean apply(CompletionCandidate c) {
- return c.getValue().length() == minPrefix;
- }
- })))
- .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections())
- // ...set the key to be the minPrefix-length prefix...
- .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix)))
- // ...and (re)apply the Top operator to all of them together.
- .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix));
-
- PCollection<KV<String, List<CompletionCandidate>>> flattenLarger = larger
- .apply("FlattenLarge", Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
-
- return PCollectionList.of(flattenLarger).and(small);
- }
- }
- }
-
- /**
- * A DoFn that keys each candidate by all its prefixes.
- */
- private static class AllPrefixes
- extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
- private static final long serialVersionUID = 0;
-
- private final int minPrefix;
- private final int maxPrefix;
- public AllPrefixes(int minPrefix) {
- this(minPrefix, Integer.MAX_VALUE);
- }
- public AllPrefixes(int minPrefix, int maxPrefix) {
- this.minPrefix = minPrefix;
- this.maxPrefix = maxPrefix;
- }
- @Override
- public void processElement(ProcessContext c) {
- String word = c.element().value;
- for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
- KV<String, CompletionCandidate> kv = KV.of(word.substring(0, i), c.element());
- c.output(kv);
- }
- }
- }
-
- /**
- * Class used to store tag-count pairs.
- */
- @DefaultCoder(AvroCoder.class)
- static class CompletionCandidate implements Comparable<CompletionCandidate> {
- private long count;
- private String value;
-
- public CompletionCandidate(String value, long count) {
- this.value = value;
- this.count = count;
- }
-
- public String getValue() {
- return value;
- }
-
- // Empty constructor required for Avro decoding.
- @SuppressWarnings("unused")
- public CompletionCandidate() {}
-
- @Override
- public int compareTo(CompletionCandidate o) {
- if (this.count < o.count) {
- return -1;
- } else if (this.count == o.count) {
- return this.value.compareTo(o.value);
- } else {
- return 1;
- }
- }
-
- @Override
- public boolean equals(Object other) {
- if (other instanceof CompletionCandidate) {
- CompletionCandidate that = (CompletionCandidate) other;
- return this.count == that.count && this.value.equals(that.value);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return Long.valueOf(count).hashCode() ^ value.hashCode();
- }
-
- @Override
- public String toString() {
- return "CompletionCandidate[" + value + ", " + count + "]";
- }
- }
-
- static class ExtractWordsFn extends DoFn<String, String> {
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
-
- @Override
- public void processElement(ProcessContext c) {
- if (c.element().trim().isEmpty()) {
- emptyLines.addValue(1L);
- }
-
- // Split the line into words.
- String[] words = c.element().split("[^a-zA-Z']+");
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- /**
- * Takes as input a the top candidates per prefix, and emits an entity
- * suitable for writing to Datastore.
- */
- static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String>
- implements DoFn.RequiresWindowAccess{
-
- private static final long serialVersionUID = 0;
-
- @Override
- public void processElement(ProcessContext c) {
- StringBuilder str = new StringBuilder();
- KV<String, List<CompletionCandidate>> elem = c.element();
-
- str.append(elem.getKey() +" @ "+ c.window() +" -> ");
- for(CompletionCandidate cand: elem.getValue()) {
- str.append(cand.toString() + " ");
- }
- System.out.println(str.toString());
- c.output(str.toString());
- }
- }
-
- /**
- * Options supported by this class.
- *
- * <p> Inherits standard Dataflow configuration options.
- */
- private interface Options extends WindowedWordCount.StreamingWordCountOptions {
- @Description("Whether to use the recursive algorithm")
- @Default.Boolean(true)
- Boolean getRecursive();
- void setRecursive(Boolean value);
- }
-
- public static void main(String[] args) throws IOException {
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
- options.setStreaming(true);
- options.setCheckpointingInterval(1000L);
- options.setNumberOfExecutionRetries(5);
- options.setExecutionRetryDelay(3000L);
- options.setRunner(FlinkPipelineRunner.class);
-
- PTransform<? super PBegin, PCollection<String>> readSource =
- Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("WordStream");
- WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
-
- // Create the pipeline.
- Pipeline p = Pipeline.create(options);
- PCollection<KV<String, List<CompletionCandidate>>> toWrite = p
- .apply(readSource)
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Window.<String>into(windowFn)
- .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
- .discardingFiredPanes())
- .apply(ComputeTopCompletions.top(10, options.getRecursive()));
-
- toWrite
- .apply(ParDo.named("FormatForPerTaskFile").of(new FormatForPerTaskLocalFile()))
- .apply(TextIO.Write.to("./outputAutoComplete.txt"));
-
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
deleted file mode 100644
index 3a8bdb0..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.beam.runners.flink.examples.streaming;
-
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
-import com.google.cloud.dataflow.sdk.transforms.windowing.*;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import org.joda.time.Duration;
-
-/**
- * To run the example, first open two sockets on two terminals by executing the commands:
- * <li>
- * <li>
- * <code>nc -lk 9999</code>, and
- * </li>
- * <li>
- * <code>nc -lk 9998</code>
- * </li>
- * </li>
- * and then launch the example. Now whatever you type in the terminal is going to be
- * the input to the program.
- * */
-public class JoinExamples {
-
- static PCollection<String> joinEvents(PCollection<String> streamA,
- PCollection<String> streamB) throws Exception {
-
- final TupleTag<String> firstInfoTag = new TupleTag<>();
- final TupleTag<String> secondInfoTag = new TupleTag<>();
-
- // transform both input collections to tuple collections, where the keys are country
- // codes in both cases.
- PCollection<KV<String, String>> firstInfo = streamA.apply(
- ParDo.of(new ExtractEventDataFn()));
- PCollection<KV<String, String>> secondInfo = streamB.apply(
- ParDo.of(new ExtractEventDataFn()));
-
- // country code 'key' -> CGBKR (<event info>, <country name>)
- PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
- .of(firstInfoTag, firstInfo)
- .and(secondInfoTag, secondInfo)
- .apply(CoGroupByKey.<String>create());
-
- // Process the CoGbkResult elements generated by the CoGroupByKey transform.
- // country code 'key' -> string of <event info>, <country name>
- PCollection<KV<String, String>> finalResultCollection =
- kvpCollection.apply(ParDo.named("Process").of(
- new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
- private static final long serialVersionUID = 0;
-
- @Override
- public void processElement(ProcessContext c) {
- KV<String, CoGbkResult> e = c.element();
- String key = e.getKey();
-
- String defaultA = "NO_VALUE";
-
- // the following getOnly is a bit tricky because it expects to have
- // EXACTLY ONE value in the corresponding stream and for the corresponding key.
-
- String lineA = e.getValue().getOnly(firstInfoTag, defaultA);
- for (String lineB : c.element().getValue().getAll(secondInfoTag)) {
- // Generate a string that combines information from both collection values
- c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + lineB));
- }
- }
- }));
-
- return finalResultCollection
- .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() {
- private static final long serialVersionUID = 0;
-
- @Override
- public void processElement(ProcessContext c) {
- String result = c.element().getKey() + " -> " + c.element().getValue();
- System.out.println(result);
- c.output(result);
- }
- }));
- }
-
- static class ExtractEventDataFn extends DoFn<String, KV<String, String>> {
- private static final long serialVersionUID = 0;
-
- @Override
- public void processElement(ProcessContext c) {
- String line = c.element().toLowerCase();
- String key = line.split("\\s")[0];
- c.output(KV.of(key, line));
- }
- }
-
- private interface Options extends WindowedWordCount.StreamingWordCountOptions {
-
- }
-
- public static void main(String[] args) throws Exception {
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
- options.setStreaming(true);
- options.setCheckpointingInterval(1000L);
- options.setNumberOfExecutionRetries(5);
- options.setExecutionRetryDelay(3000L);
- options.setRunner(FlinkPipelineRunner.class);
-
- PTransform<? super PBegin, PCollection<String>> readSourceA =
- Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream");
- PTransform<? super PBegin, PCollection<String>> readSourceB =
- Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream");
-
- WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
-
- Pipeline p = Pipeline.create(options);
-
- // the following two 'applys' create multiple inputs to our pipeline, one for each
- // of our two input sources.
- PCollection<String> streamA = p.apply(readSourceA)
- .apply(Window.<String>into(windowFn)
- .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
- .discardingFiredPanes());
- PCollection<String> streamB = p.apply(readSourceB)
- .apply(Window.<String>into(windowFn)
- .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
- .discardingFiredPanes());
-
- PCollection<String> formattedResults = joinEvents(streamA, streamB);
- formattedResults.apply(TextIO.Write.to("./outputJoin.txt"));
- p.run();
- }
-
-}