You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/04/19 13:09:26 UTC
[17/18] beam git commit: [BEAM-1994] Remove Flink examples package
[BEAM-1994] Remove Flink examples package
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cdd2544b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cdd2544b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cdd2544b
Branch: refs/heads/master
Commit: cdd2544ba6dd6ac4aa80c65ecd8e01ab3cf664aa
Parents: 8a00f22
Author: Isma�l Mej�a <ie...@apache.org>
Authored: Tue Apr 18 17:31:07 2017 +0200
Committer: Isma�l Mej�a <ie...@apache.org>
Committed: Wed Apr 19 13:37:06 2017 +0200
----------------------------------------------------------------------
...PostCommit_Java_ValidatesRunner_Flink.groovy | 2 +-
runners/flink/examples/pom.xml | 130 ---
.../beam/runners/flink/examples/TFIDF.java | 455 --------
.../beam/runners/flink/examples/WordCount.java | 129 ---
.../runners/flink/examples/package-info.java | 22 -
.../flink/examples/streaming/AutoComplete.java | 400 -------
.../flink/examples/streaming/JoinExamples.java | 154 ---
.../examples/streaming/WindowedWordCount.java | 141 ---
.../flink/examples/streaming/package-info.java | 22 -
runners/flink/pom.xml | 275 ++++-
runners/flink/runner/pom.xml | 330 ------
.../flink/DefaultParallelismFactory.java | 39 -
.../flink/FlinkBatchPipelineTranslator.java | 139 ---
.../flink/FlinkBatchTransformTranslators.java | 723 ------------
.../flink/FlinkBatchTranslationContext.java | 153 ---
.../flink/FlinkDetachedRunnerResult.java | 75 --
.../FlinkPipelineExecutionEnvironment.java | 241 ----
.../runners/flink/FlinkPipelineOptions.java | 101 --
.../runners/flink/FlinkPipelineTranslator.java | 53 -
.../apache/beam/runners/flink/FlinkRunner.java | 232 ----
.../runners/flink/FlinkRunnerRegistrar.java | 62 --
.../beam/runners/flink/FlinkRunnerResult.java | 98 --
.../flink/FlinkStreamingPipelineTranslator.java | 276 -----
.../FlinkStreamingTransformTranslators.java | 1044 -----------------
.../flink/FlinkStreamingTranslationContext.java | 130 ---
.../flink/FlinkStreamingViewOverrides.java | 372 -------
.../flink/PipelineTranslationOptimizer.java | 72 --
.../beam/runners/flink/TestFlinkRunner.java | 84 --
.../beam/runners/flink/TranslationMode.java | 31 -
.../apache/beam/runners/flink/package-info.java | 22 -
.../functions/FlinkAggregatorFactory.java | 53 -
.../functions/FlinkAssignContext.java | 63 --
.../functions/FlinkAssignWindows.java | 49 -
.../functions/FlinkDoFnFunction.java | 161 ---
.../FlinkMergingNonShuffleReduceFunction.java | 228 ----
.../FlinkMergingPartialReduceFunction.java | 201 ----
.../functions/FlinkMergingReduceFunction.java | 199 ----
.../FlinkMultiOutputPruningFunction.java | 50 -
.../functions/FlinkNoOpStepContext.java | 73 --
.../functions/FlinkPartialReduceFunction.java | 172 ---
.../functions/FlinkReduceFunction.java | 173 ---
.../functions/FlinkSideInputReader.java | 80 --
.../functions/FlinkStatefulDoFnFunction.java | 198 ----
.../functions/SideInputInitializer.java | 73 --
.../translation/functions/package-info.java | 22 -
.../runners/flink/translation/package-info.java | 22 -
.../translation/types/CoderTypeInformation.java | 120 --
.../translation/types/CoderTypeSerializer.java | 132 ---
.../types/EncodedValueComparator.java | 195 ----
.../types/EncodedValueSerializer.java | 113 --
.../types/EncodedValueTypeInformation.java | 98 --
.../types/InspectableByteArrayOutputStream.java | 34 -
.../flink/translation/types/KvKeySelector.java | 50 -
.../flink/translation/types/package-info.java | 22 -
.../utils/SerializedPipelineOptions.java | 67 --
.../flink/translation/utils/package-info.java | 22 -
.../wrappers/DataInputViewWrapper.java | 58 -
.../wrappers/DataOutputViewWrapper.java | 51 -
.../SerializableFnAggregatorWrapper.java | 98 --
.../translation/wrappers/SourceInputFormat.java | 150 ---
.../translation/wrappers/SourceInputSplit.java | 52 -
.../translation/wrappers/package-info.java | 22 -
.../wrappers/streaming/DoFnOperator.java | 774 -------------
.../streaming/KvToByteBufferKeySelector.java | 56 -
.../streaming/SingletonKeyedWorkItem.java | 56 -
.../streaming/SingletonKeyedWorkItemCoder.java | 126 ---
.../streaming/SplittableDoFnOperator.java | 150 ---
.../wrappers/streaming/WindowDoFnOperator.java | 117 --
.../wrappers/streaming/WorkItemKeySelector.java | 56 -
.../streaming/io/BoundedSourceWrapper.java | 218 ----
.../streaming/io/UnboundedSocketSource.java | 249 -----
.../streaming/io/UnboundedSourceWrapper.java | 476 --------
.../wrappers/streaming/io/package-info.java | 22 -
.../wrappers/streaming/package-info.java | 22 -
.../state/FlinkBroadcastStateInternals.java | 865 --------------
.../state/FlinkKeyGroupStateInternals.java | 487 --------
.../state/FlinkSplitStateInternals.java | 260 -----
.../streaming/state/FlinkStateInternals.java | 1053 ------------------
.../state/KeyGroupCheckpointedOperator.java | 35 -
.../state/KeyGroupRestoringOperator.java | 32 -
.../wrappers/streaming/state/package-info.java | 22 -
.../runner/src/main/resources/log4j.properties | 23 -
.../flink/EncodedValueComparatorTest.java | 70 --
.../runners/flink/FlinkRunnerRegistrarTest.java | 48 -
.../beam/runners/flink/FlinkTestPipeline.java | 72 --
.../beam/runners/flink/PipelineOptionsTest.java | 184 ---
.../beam/runners/flink/ReadSourceITCase.java | 85 --
.../flink/ReadSourceStreamingITCase.java | 74 --
.../beam/runners/flink/WriteSinkITCase.java | 192 ----
.../flink/streaming/DoFnOperatorTest.java | 600 ----------
.../FlinkBroadcastStateInternalsTest.java | 245 ----
.../FlinkKeyGroupStateInternalsTest.java | 262 -----
.../streaming/FlinkSplitStateInternalsTest.java | 101 --
.../streaming/FlinkStateInternalsTest.java | 395 -------
.../flink/streaming/GroupByNullKeyTest.java | 124 ---
.../flink/streaming/TestCountingSource.java | 254 -----
.../streaming/TopWikipediaSessionsITCase.java | 133 ---
.../streaming/UnboundedSourceWrapperTest.java | 464 --------
.../runners/flink/streaming/package-info.java | 22 -
.../src/test/resources/log4j-test.properties | 27 -
.../flink/DefaultParallelismFactory.java | 39 +
.../flink/FlinkBatchPipelineTranslator.java | 139 +++
.../flink/FlinkBatchTransformTranslators.java | 723 ++++++++++++
.../flink/FlinkBatchTranslationContext.java | 153 +++
.../flink/FlinkDetachedRunnerResult.java | 75 ++
.../FlinkPipelineExecutionEnvironment.java | 241 ++++
.../runners/flink/FlinkPipelineOptions.java | 101 ++
.../runners/flink/FlinkPipelineTranslator.java | 53 +
.../apache/beam/runners/flink/FlinkRunner.java | 232 ++++
.../runners/flink/FlinkRunnerRegistrar.java | 62 ++
.../beam/runners/flink/FlinkRunnerResult.java | 98 ++
.../flink/FlinkStreamingPipelineTranslator.java | 276 +++++
.../FlinkStreamingTransformTranslators.java | 1044 +++++++++++++++++
.../flink/FlinkStreamingTranslationContext.java | 130 +++
.../flink/FlinkStreamingViewOverrides.java | 372 +++++++
.../flink/PipelineTranslationOptimizer.java | 72 ++
.../beam/runners/flink/TestFlinkRunner.java | 84 ++
.../beam/runners/flink/TranslationMode.java | 31 +
.../apache/beam/runners/flink/package-info.java | 22 +
.../functions/FlinkAggregatorFactory.java | 53 +
.../functions/FlinkAssignContext.java | 63 ++
.../functions/FlinkAssignWindows.java | 49 +
.../functions/FlinkDoFnFunction.java | 161 +++
.../FlinkMergingNonShuffleReduceFunction.java | 228 ++++
.../FlinkMergingPartialReduceFunction.java | 201 ++++
.../functions/FlinkMergingReduceFunction.java | 199 ++++
.../FlinkMultiOutputPruningFunction.java | 50 +
.../functions/FlinkNoOpStepContext.java | 73 ++
.../functions/FlinkPartialReduceFunction.java | 172 +++
.../functions/FlinkReduceFunction.java | 173 +++
.../functions/FlinkSideInputReader.java | 80 ++
.../functions/FlinkStatefulDoFnFunction.java | 198 ++++
.../functions/SideInputInitializer.java | 73 ++
.../translation/functions/package-info.java | 22 +
.../runners/flink/translation/package-info.java | 22 +
.../translation/types/CoderTypeInformation.java | 120 ++
.../translation/types/CoderTypeSerializer.java | 132 +++
.../types/EncodedValueComparator.java | 195 ++++
.../types/EncodedValueSerializer.java | 113 ++
.../types/EncodedValueTypeInformation.java | 98 ++
.../types/InspectableByteArrayOutputStream.java | 34 +
.../flink/translation/types/KvKeySelector.java | 50 +
.../flink/translation/types/package-info.java | 22 +
.../utils/SerializedPipelineOptions.java | 67 ++
.../flink/translation/utils/package-info.java | 22 +
.../wrappers/DataInputViewWrapper.java | 58 +
.../wrappers/DataOutputViewWrapper.java | 51 +
.../SerializableFnAggregatorWrapper.java | 98 ++
.../translation/wrappers/SourceInputFormat.java | 150 +++
.../translation/wrappers/SourceInputSplit.java | 52 +
.../translation/wrappers/package-info.java | 22 +
.../wrappers/streaming/DoFnOperator.java | 774 +++++++++++++
.../streaming/KvToByteBufferKeySelector.java | 56 +
.../streaming/SingletonKeyedWorkItem.java | 56 +
.../streaming/SingletonKeyedWorkItemCoder.java | 126 +++
.../streaming/SplittableDoFnOperator.java | 150 +++
.../wrappers/streaming/WindowDoFnOperator.java | 117 ++
.../wrappers/streaming/WorkItemKeySelector.java | 56 +
.../streaming/io/BoundedSourceWrapper.java | 218 ++++
.../streaming/io/UnboundedSocketSource.java | 249 +++++
.../streaming/io/UnboundedSourceWrapper.java | 476 ++++++++
.../wrappers/streaming/io/package-info.java | 22 +
.../wrappers/streaming/package-info.java | 22 +
.../state/FlinkBroadcastStateInternals.java | 865 ++++++++++++++
.../state/FlinkKeyGroupStateInternals.java | 487 ++++++++
.../state/FlinkSplitStateInternals.java | 260 +++++
.../streaming/state/FlinkStateInternals.java | 1053 ++++++++++++++++++
.../state/KeyGroupCheckpointedOperator.java | 35 +
.../state/KeyGroupRestoringOperator.java | 32 +
.../wrappers/streaming/state/package-info.java | 22 +
.../flink/src/main/resources/log4j.properties | 23 +
.../flink/EncodedValueComparatorTest.java | 70 ++
.../runners/flink/FlinkRunnerRegistrarTest.java | 48 +
.../beam/runners/flink/FlinkTestPipeline.java | 72 ++
.../beam/runners/flink/PipelineOptionsTest.java | 184 +++
.../beam/runners/flink/ReadSourceITCase.java | 85 ++
.../flink/ReadSourceStreamingITCase.java | 74 ++
.../beam/runners/flink/WriteSinkITCase.java | 192 ++++
.../flink/streaming/DoFnOperatorTest.java | 600 ++++++++++
.../FlinkBroadcastStateInternalsTest.java | 245 ++++
.../FlinkKeyGroupStateInternalsTest.java | 262 +++++
.../streaming/FlinkSplitStateInternalsTest.java | 101 ++
.../streaming/FlinkStateInternalsTest.java | 395 +++++++
.../flink/streaming/GroupByNullKeyTest.java | 124 +++
.../flink/streaming/TestCountingSource.java | 254 +++++
.../streaming/TopWikipediaSessionsITCase.java | 133 +++
.../streaming/UnboundedSourceWrapperTest.java | 464 ++++++++
.../runners/flink/streaming/package-info.java | 22 +
.../src/test/resources/log4j-test.properties | 27 +
189 files changed, 15765 insertions(+), 17293 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy
index 411106d..5b228bc 100644
--- a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy
+++ b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy
@@ -39,5 +39,5 @@ mavenJob('beam_PostCommit_Java_ValidatesRunner_Flink') {
'Run Flink ValidatesRunner')
// Maven goals for this job.
- goals('-B -e clean verify -am -pl runners/flink/runner -Plocal-validates-runner-tests -Pvalidates-runner-tests')
+ goals('-B -e clean verify -am -pl runners/flink -Plocal-validates-runner-tests -Pvalidates-runner-tests')
}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/examples/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml
deleted file mode 100644
index aaf76d9..0000000
--- a/runners/flink/examples/pom.xml
+++ /dev/null
@@ -1,130 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-flink-parent</artifactId>
- <version>0.7.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>beam-runners-flink_2.10-examples</artifactId>
-
- <name>Apache Beam :: Runners :: Flink :: Examples</name>
-
- <packaging>jar</packaging>
-
- <properties>
- <!-- Default parameters for mvn exec:java -->
- <flink.examples.input>kinglear.txt</flink.examples.input>
- <flink.examples.output>wordcounts.txt</flink.examples.output>
- <flink.examples.parallelism>-1</flink.examples.parallelism>
- </properties>
-
- <profiles>
- <profile>
- <id>disable-validates-runner-tests</id>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <executions>
- <execution>
- <id>validates-runner-tests</id>
- <configuration>
- <skip>true</skip>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-flink_2.10</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <goals><goal>analyze-only</goal></goals>
- <configuration>
- <!-- disable for now until dependencies are cleaned up -->
- <failOnWarning>false</failOnWarning>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <configuration>
- <executable>java</executable>
- <arguments>
- <argument>--runner=org.apache.beam.runners.flink.FlinkRunner</argument>
- <argument>--parallelism=${flink.examples.parallelism}</argument>
- <argument>--input=${flink.examples.input}</argument>
- <argument>--output=${flink.examples.output}</argument>
- </arguments>
- </configuration>
- </plugin>
-
- </plugins>
-
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
deleted file mode 100644
index 8e1df08..0000000
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ /dev/null
@@ -1,455 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.examples;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.FlinkRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringDelegateCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Distinct;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Keys;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TupleTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An example that computes a basic TF-IDF search table for a directory or GCS prefix.
- *
- * <p>Concepts: joining data; side inputs; logging
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * }</pre>
- * and a local output file or output prefix on GCS:
- * <pre>{@code
- * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
- * }</pre>
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowRunner
- * and an output prefix on GCS:
- * --output=gs://YOUR_OUTPUT_PREFIX
- * }</pre>
- *
- * <p>The default input is {@code gs://dataflow-samples/shakespeare/} and can be overridden with
- * {@code --input}.
- */
-public class TFIDF {
- /**
- * Options supported by {@link TFIDF}.
- *
- * <p>Inherits standard configuration options.
- */
- private interface Options extends PipelineOptions, FlinkPipelineOptions {
- @Description("Path to the directory or GCS prefix containing files to read from")
- @Default.String("gs://dataflow-samples/shakespeare/")
- String getInput();
- void setInput(String value);
-
- @Description("Prefix of output URI to write to")
- @Validation.Required
- String getOutput();
- void setOutput(String value);
- }
-
- /**
- * Lists documents contained beneath the {@code options.input} prefix/directory.
- */
- public static Set<URI> listInputDocuments(Options options)
- throws URISyntaxException, IOException {
- URI baseUri = new URI(options.getInput());
-
- // List all documents in the directory or GCS prefix.
- URI absoluteUri;
- if (baseUri.getScheme() != null) {
- absoluteUri = baseUri;
- } else {
- absoluteUri = new URI(
- "file",
- baseUri.getAuthority(),
- baseUri.getPath(),
- baseUri.getQuery(),
- baseUri.getFragment());
- }
-
- Set<URI> uris = new HashSet<>();
- if (absoluteUri.getScheme().equals("file")) {
- File directory = new File(absoluteUri);
- String[] directoryListing = directory.list();
- if (directoryListing == null) {
- throw new IOException(
- "Directory " + absoluteUri + " is not a valid path or IO Error occurred.");
- }
- for (String entry : directoryListing) {
- File path = new File(directory, entry);
- uris.add(path.toURI());
- }
- } else if (absoluteUri.getScheme().equals("gs")) {
- GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil();
- URI gcsUriGlob = new URI(
- absoluteUri.getScheme(),
- absoluteUri.getAuthority(),
- absoluteUri.getPath() + "*",
- absoluteUri.getQuery(),
- absoluteUri.getFragment());
- for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) {
- uris.add(entry.toUri());
- }
- }
-
- return uris;
- }
-
- /**
- * Reads the documents at the provided uris and returns all lines
- * from the documents tagged with which document they are from.
- */
- public static class ReadDocuments
- extends PTransform<PBegin, PCollection<KV<URI, String>>> {
- private static final long serialVersionUID = 0;
-
- // transient because PTransform is not really meant to be serialized.
- // see note on PTransform
- private final transient Iterable<URI> uris;
-
- public ReadDocuments(Iterable<URI> uris) {
- this.uris = uris;
- }
-
- @Override
- public Coder<?> getDefaultOutputCoder() {
- return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of());
- }
-
- @Override
- public PCollection<KV<URI, String>> expand(PBegin input) {
- Pipeline pipeline = input.getPipeline();
-
- // Create one TextIO.Read transform for each document
- // and add its output to a PCollectionList
- PCollectionList<KV<URI, String>> urisToLines =
- PCollectionList.empty(pipeline);
-
- // TextIO.Read supports:
- // - file: URIs and paths locally
- // - gs: URIs on the service
- for (final URI uri : uris) {
- String uriString;
- if (uri.getScheme().equals("file")) {
- uriString = new File(uri).getPath();
- } else {
- uriString = uri.toString();
- }
-
- PCollection<KV<URI, String>> oneUriToLines = pipeline
- .apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString))
- .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
-
- urisToLines = urisToLines.and(oneUriToLines);
- }
-
- return urisToLines.apply(Flatten.<KV<URI, String>>pCollections());
- }
- }
-
- /**
- * A transform containing a basic TF-IDF pipeline. The input consists of KV objects
- * where the key is the document's URI and the value is a piece
- * of the document's content. The output is mapping from terms to
- * scores for each document URI.
- */
- public static class ComputeTfIdf
- extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> {
- private static final long serialVersionUID = 0;
-
- public ComputeTfIdf() { }
-
- @Override
- public PCollection<KV<String, KV<URI, Double>>> expand(
- PCollection<KV<URI, String>> uriToContent) {
-
- // Compute the total number of documents, and
- // prepare this singleton PCollectionView for
- // use as a side input.
- final PCollectionView<Long> totalDocuments =
- uriToContent
- .apply("GetURIs", Keys.<URI>create())
- .apply("DistinctDocs", Distinct.<URI>create())
- .apply(Count.<URI>globally())
- .apply(View.<Long>asSingleton());
-
- // Create a collection of pairs mapping a URI to each
- // of the words in the document associated with that that URI.
- PCollection<KV<URI, String>> uriToWords = uriToContent
- .apply("SplitWords", ParDo.of(new DoFn<KV<URI, String>, KV<URI, String>>() {
- private static final long serialVersionUID = 0;
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- URI uri = c.element().getKey();
- String line = c.element().getValue();
- for (String word : line.split("\\W+")) {
- // Log INFO messages when the word \u201clove\u201d is found.
- if (word.toLowerCase().equals("love")) {
- LOG.info("Found {}", word.toLowerCase());
- }
-
- if (!word.isEmpty()) {
- c.output(KV.of(uri, word.toLowerCase()));
- }
- }
- }
- }));
-
- // Compute a mapping from each word to the total
- // number of documents in which it appears.
- PCollection<KV<String, Long>> wordToDocCount = uriToWords
- .apply("DistinctWords", Distinct.<KV<URI, String>>create())
- .apply(Values.<String>create())
- .apply("CountDocs", Count.<String>perElement());
-
- // Compute a mapping from each URI to the total
- // number of words in the document associated with that URI.
- PCollection<KV<URI, Long>> uriToWordTotal = uriToWords
- .apply("GetURIs2", Keys.<URI>create())
- .apply("CountWords", Count.<URI>perElement());
-
- // Count, for each (URI, word) pair, the number of
- // occurrences of that word in the document associated
- // with the URI.
- PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords
- .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement());
-
- // Adjust the above collection to a mapping from
- // (URI, word) pairs to counts into an isomorphic mapping
- // from URI to (word, count) pairs, to prepare for a join
- // by the URI key.
- PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
- .apply("ShiftKeys", ParDo.of(
- new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
- private static final long serialVersionUID = 0;
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- URI uri = c.element().getKey().getKey();
- String word = c.element().getKey().getValue();
- Long occurrences = c.element().getValue();
- c.output(KV.of(uri, KV.of(word, occurrences)));
- }
- }));
-
- // Prepare to join the mapping of URI to (word, count) pairs with
- // the mapping of URI to total word counts, by associating
- // each of the input PCollection<KV<URI, ...>> with
- // a tuple tag. Each input must have the same key type, URI
- // in this case. The type parameter of the tuple tag matches
- // the types of the values for each collection.
- final TupleTag<Long> wordTotalsTag = new TupleTag<>();
- final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<>();
- KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple
- .of(wordTotalsTag, uriToWordTotal)
- .and(wordCountsTag, uriToWordAndCount);
-
- // Perform a CoGroupByKey (a sort of pre-join) on the prepared
- // inputs. This yields a mapping from URI to a CoGbkResult
- // (CoGroupByKey Result). The CoGbkResult is a mapping
- // from the above tuple tags to the values in each input
- // associated with a particular URI. In this case, each
- // KV<URI, CoGbkResult> group a URI with the total number of
- // words in that document as well as all the (word, count)
- // pairs for particular words.
- PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput
- .apply("CoGroupByUri", CoGroupByKey.<URI>create());
-
- // Compute a mapping from each word to a (URI, term frequency)
- // pair for each URI. A word's term frequency for a document
- // is simply the number of times that word occurs in the document
- // divided by the total number of words in the document.
- PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
- .apply("ComputeTermFrequencies", ParDo.of(
- new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
- private static final long serialVersionUID = 0;
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- URI uri = c.element().getKey();
- Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
-
- for (KV<String, Long> wordAndCount
- : c.element().getValue().getAll(wordCountsTag)) {
- String word = wordAndCount.getKey();
- Long wordCount = wordAndCount.getValue();
- Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue();
- c.output(KV.of(word, KV.of(uri, termFrequency)));
- }
- }
- }));
-
- // Compute a mapping from each word to its document frequency.
- // A word's document frequency in a corpus is the number of
- // documents in which the word appears divided by the total
- // number of documents in the corpus. Note how the total number of
- // documents is passed as a side input; the same value is
- // presented to each invocation of the DoFn.
- PCollection<KV<String, Double>> wordToDf = wordToDocCount
- .apply("ComputeDocFrequencies", ParDo
- .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
- private static final long serialVersionUID = 0;
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- String word = c.element().getKey();
- Long documentCount = c.element().getValue();
- Long documentTotal = c.sideInput(totalDocuments);
- Double documentFrequency = documentCount.doubleValue()
- / documentTotal.doubleValue();
-
- c.output(KV.of(word, documentFrequency));
- }
- }).withSideInputs(totalDocuments));
-
- // Join the term frequency and document frequency
- // collections, each keyed on the word.
- final TupleTag<KV<URI, Double>> tfTag = new TupleTag<>();
- final TupleTag<Double> dfTag = new TupleTag<>();
- PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple
- .of(tfTag, wordToUriAndTf)
- .and(dfTag, wordToDf)
- .apply(CoGroupByKey.<String>create());
-
- // Compute a mapping from each word to a (URI, TF-IDF) score
- // for each URI. There are a variety of definitions of TF-IDF
- // ("term frequency - inverse document frequency") score;
- // here we use a basic version that is the term frequency
- // divided by the log of the document frequency.
-
- return wordToUriAndTfAndDf
- .apply("ComputeTfIdf", ParDo.of(
- new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
- private static final long serialVersionUID = 0;
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- String word = c.element().getKey();
- Double df = c.element().getValue().getOnly(dfTag);
-
- for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) {
- URI uri = uriAndTf.getKey();
- Double tf = uriAndTf.getValue();
- Double tfIdf = tf * Math.log(1 / df);
- c.output(KV.of(word, KV.of(uri, tfIdf)));
- }
- }
- }));
- }
-
- // Instantiate Logger.
- // It is suggested that the user specify the class name of the containing class
- // (in this case ComputeTfIdf).
- private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class);
- }
-
- /**
- * A {@link PTransform} to write, in CSV format, a mapping from term and URI
- * to score.
- */
- public static class WriteTfIdf
- extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> {
- private static final long serialVersionUID = 0;
-
- private String output;
-
- public WriteTfIdf(String output) {
- this.output = output;
- }
-
- @Override
- public PDone expand(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
- return wordToUriAndTfIdf
- .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
- private static final long serialVersionUID = 0;
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(String.format("%s,\t%s,\t%f",
- c.element().getKey(),
- c.element().getValue().getKey(),
- c.element().getValue().getValue()));
- }
- }))
- .apply(TextIO.Write
- .to(output)
- .withSuffix(".csv"));
- }
- }
-
- public static void main(String[] args) throws Exception {
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-
- options.setRunner(FlinkRunner.class);
-
- Pipeline pipeline = Pipeline.create(options);
- pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
-
- pipeline
- .apply(new ReadDocuments(listInputDocuments(options)))
- .apply(new ComputeTfIdf())
- .apply(new WriteTfIdf(options.getOutput()));
-
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
deleted file mode 100644
index 6ae4cf8..0000000
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.examples;
-
-import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.FlinkRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Wordcount pipeline.
- */
-public class WordCount {
-
- /**
- * Function to extract words.
- */
- public static class ExtractWordsFn extends DoFn<String, String> {
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", Sum.ofLongs());
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- if (c.element().trim().isEmpty()) {
- emptyLines.addValue(1L);
- }
-
- // Split the line into words.
- String[] words = c.element().split("[^a-zA-Z']+");
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- /**
- * PTransform counting words.
- */
- public static class CountWords extends PTransform<PCollection<String>,
- PCollection<KV<String, Long>>> {
- @Override
- public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
-
- // Convert lines of text into individual words.
- PCollection<String> words = lines.apply(
- ParDo.of(new ExtractWordsFn()));
-
- // Count the number of times each word occurs.
- PCollection<KV<String, Long>> wordCounts =
- words.apply(Count.<String>perElement());
-
- return wordCounts;
- }
- }
-
- /** A SimpleFunction that converts a Word and Count into a printable string. */
- public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
- @Override
- public String apply(KV<String, Long> input) {
- return input.getKey() + ": " + input.getValue();
- }
- }
-
- /**
- * Options supported by {@link WordCount}.
- *
- * <p>Inherits standard configuration options.
- */
- public interface Options extends PipelineOptions, FlinkPipelineOptions {
- @Description("Path of the file to read from")
- String getInput();
- void setInput(String value);
-
- @Description("Path of the file to write to")
- @Validation.Required
- String getOutput();
- void setOutput(String value);
- }
-
- public static void main(String[] args) {
-
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation()
- .as(Options.class);
- options.setRunner(FlinkRunner.class);
-
- Pipeline p = Pipeline.create(options);
-
- p.apply("ReadLines", TextIO.Read.from(options.getInput()))
- .apply(new CountWords())
- .apply(MapElements.via(new FormatAsTextFn()))
- .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
-
- p.run();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java
deleted file mode 100644
index b0ecb56..0000000
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Flink Beam runner exemple.
- */
-package org.apache.beam.runners.flink.examples;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
deleted file mode 100644
index d07df29..0000000
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ /dev/null
@@ -1,400 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.examples.streaming;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.beam.runners.flink.FlinkRunner;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Partition;
-import org.apache.beam.sdk.transforms.Partition.PartitionFn;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.Top;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.joda.time.Duration;
-
-/**
- * To run the example, first open a socket on a terminal by executing the command:
- * <ul>
- * <li><code>nc -lk 9999</code>
- * </ul>
- * and then launch the example. Now whatever you type in the terminal is going to be
- * the input to the program.
- * */
-public class AutoComplete {
-
- /**
- * A PTransform that takes as input a list of tokens and returns
- * the most common tokens per prefix.
- */
- public static class ComputeTopCompletions
- extends PTransform<PCollection<String>, PCollection<KV<String, List<CompletionCandidate>>>> {
- private static final long serialVersionUID = 0;
-
- private final int candidatesPerPrefix;
- private final boolean recursive;
-
- protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) {
- this.candidatesPerPrefix = candidatesPerPrefix;
- this.recursive = recursive;
- }
-
- public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) {
- return new ComputeTopCompletions(candidatesPerPrefix, recursive);
- }
-
- @Override
- public PCollection<KV<String, List<CompletionCandidate>>> expand(PCollection<String> input) {
- PCollection<CompletionCandidate> candidates = input
- // First count how often each token appears.
- .apply(Count.<String>perElement())
-
- // Map the KV outputs of Count into our own CompletionCandiate class.
- .apply("CreateCompletionCandidates", ParDo.of(
- new DoFn<KV<String, Long>, CompletionCandidate>() {
- private static final long serialVersionUID = 0;
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- CompletionCandidate cand = new CompletionCandidate(c.element().getKey(),
- c.element().getValue());
- c.output(cand);
- }
- }));
-
- // Compute the top via either a flat or recursive algorithm.
- if (recursive) {
- return candidates
- .apply(new ComputeTopRecursive(candidatesPerPrefix, 1))
- .apply(Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
- } else {
- return candidates
- .apply(new ComputeTopFlat(candidatesPerPrefix, 1));
- }
- }
- }
-
- /**
- * Lower latency, but more expensive.
- */
- private static class ComputeTopFlat
- extends PTransform<PCollection<CompletionCandidate>,
- PCollection<KV<String, List<CompletionCandidate>>>> {
- private static final long serialVersionUID = 0;
-
- private final int candidatesPerPrefix;
- private final int minPrefix;
-
- public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) {
- this.candidatesPerPrefix = candidatesPerPrefix;
- this.minPrefix = minPrefix;
- }
-
- @Override
- public PCollection<KV<String, List<CompletionCandidate>>> expand(
- PCollection<CompletionCandidate> input) {
- return input
- // For each completion candidate, map it to all prefixes.
- .apply(ParDo.of(new AllPrefixes(minPrefix)))
-
- // Find and return the top candiates for each prefix.
- .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix)
- .withHotKeyFanout(new HotKeyFanout()));
- }
-
- private static class HotKeyFanout implements SerializableFunction<String, Integer> {
- private static final long serialVersionUID = 0;
-
- @Override
- public Integer apply(String input) {
- return (int) Math.pow(4, 5 - input.length());
- }
- }
- }
-
- /**
- * Cheaper but higher latency.
- *
- * <p>Returns two PCollections, the first is top prefixes of size greater
- * than minPrefix, and the second is top prefixes of size exactly
- * minPrefix.
- */
- private static class ComputeTopRecursive
- extends PTransform<PCollection<CompletionCandidate>,
- PCollectionList<KV<String, List<CompletionCandidate>>>> {
- private static final long serialVersionUID = 0;
-
- private final int candidatesPerPrefix;
- private final int minPrefix;
-
- public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) {
- this.candidatesPerPrefix = candidatesPerPrefix;
- this.minPrefix = minPrefix;
- }
-
- private class KeySizePartitionFn implements PartitionFn<KV<String, List<CompletionCandidate>>> {
- private static final long serialVersionUID = 0;
-
- @Override
- public int partitionFor(KV<String, List<CompletionCandidate>> elem, int numPartitions) {
- return elem.getKey().length() > minPrefix ? 0 : 1;
- }
- }
-
- private static class FlattenTops
- extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
- private static final long serialVersionUID = 0;
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- for (CompletionCandidate cc : c.element().getValue()) {
- c.output(cc);
- }
- }
- }
-
- @Override
- public PCollectionList<KV<String, List<CompletionCandidate>>> expand(
- PCollection<CompletionCandidate> input) {
- if (minPrefix > 10) {
- // Base case, partitioning to return the output in the expected format.
- return input
- .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix))
- .apply(Partition.of(2, new KeySizePartitionFn()));
- } else {
- // If a candidate is in the top N for prefix a...b, it must also be in the top
- // N for a...bX for every X, which is typlically a much smaller set to consider.
- // First, compute the top candidate for prefixes of size at least minPrefix + 1.
- PCollectionList<KV<String, List<CompletionCandidate>>> larger = input
- .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 1));
- // Consider the top candidates for each prefix of length minPrefix + 1...
- PCollection<KV<String, List<CompletionCandidate>>> small =
- PCollectionList
- .of(larger.get(1).apply(ParDo.of(new FlattenTops())))
- // ...together with those (previously excluded) candidates of length
- // exactly minPrefix...
- .and(input.apply(Filter.by(new SerializableFunction<CompletionCandidate, Boolean>() {
- private static final long serialVersionUID = 0;
-
- @Override
- public Boolean apply(CompletionCandidate c) {
- return c.getValue().length() == minPrefix;
- }
- })))
- .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections())
- // ...set the key to be the minPrefix-length prefix...
- .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix)))
- // ...and (re)apply the Top operator to all of them together.
- .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix));
-
- PCollection<KV<String, List<CompletionCandidate>>> flattenLarger = larger
- .apply("FlattenLarge", Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
-
- return PCollectionList.of(flattenLarger).and(small);
- }
- }
- }
-
- /**
- * A DoFn that keys each candidate by all its prefixes.
- */
- private static class AllPrefixes
- extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
- private static final long serialVersionUID = 0;
-
- private final int minPrefix;
- private final int maxPrefix;
- public AllPrefixes(int minPrefix) {
- this(minPrefix, Integer.MAX_VALUE);
- }
- public AllPrefixes(int minPrefix, int maxPrefix) {
- this.minPrefix = minPrefix;
- this.maxPrefix = maxPrefix;
- }
- @ProcessElement
- public void processElement(ProcessContext c) {
- String word = c.element().value;
- for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
- KV<String, CompletionCandidate> kv = KV.of(word.substring(0, i), c.element());
- c.output(kv);
- }
- }
- }
-
- /**
- * Class used to store tag-count pairs.
- */
- @DefaultCoder(AvroCoder.class)
- static class CompletionCandidate implements Comparable<CompletionCandidate> {
- private long count;
- private String value;
-
- public CompletionCandidate(String value, long count) {
- this.value = value;
- this.count = count;
- }
-
- public String getValue() {
- return value;
- }
-
- // Empty constructor required for Avro decoding.
- @SuppressWarnings("unused")
- public CompletionCandidate() {}
-
- @Override
- public int compareTo(CompletionCandidate o) {
- if (this.count < o.count) {
- return -1;
- } else if (this.count == o.count) {
- return this.value.compareTo(o.value);
- } else {
- return 1;
- }
- }
-
- @Override
- public boolean equals(Object other) {
- if (other instanceof CompletionCandidate) {
- CompletionCandidate that = (CompletionCandidate) other;
- return this.count == that.count && this.value.equals(that.value);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return Long.valueOf(count).hashCode() ^ value.hashCode();
- }
-
- @Override
- public String toString() {
- return "CompletionCandidate[" + value + ", " + count + "]";
- }
- }
-
- static class ExtractWordsFn extends DoFn<String, String> {
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", Sum.ofLongs());
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- if (c.element().trim().isEmpty()) {
- emptyLines.addValue(1L);
- }
-
- // Split the line into words.
- String[] words = c.element().split("[^a-zA-Z']+");
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- /**
- * Takes as input a the top candidates per prefix, and emits an entity suitable for writing to
- * Datastore.
- */
- static class FormatForPerTaskLocalFile
- extends DoFn<KV<String, List<CompletionCandidate>>, String> {
-
- private static final long serialVersionUID = 0;
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) {
- StringBuilder str = new StringBuilder();
- KV<String, List<CompletionCandidate>> elem = c.element();
-
- str.append(elem.getKey() + " @ " + window + " -> ");
- for (CompletionCandidate cand: elem.getValue()) {
- str.append(cand.toString() + " ");
- }
- System.out.println(str.toString());
- c.output(str.toString());
- }
- }
-
- /**
- * Options supported by this class.
- *
- * <p>Inherits standard Dataflow configuration options.
- */
- private interface Options extends WindowedWordCount.StreamingWordCountOptions {
- @Description("Whether to use the recursive algorithm")
- @Default.Boolean(true)
- Boolean getRecursive();
- void setRecursive(Boolean value);
- }
-
- public static void main(String[] args) throws IOException {
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
- options.setStreaming(true);
- options.setCheckpointingInterval(1000L);
- options.setNumberOfExecutionRetries(5);
- options.setExecutionRetryDelay(3000L);
- options.setRunner(FlinkRunner.class);
-
-
- WindowFn<Object, ?> windowFn =
- FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
-
- // Create the pipeline.
- Pipeline p = Pipeline.create(options);
- PCollection<KV<String, List<CompletionCandidate>>> toWrite = p
- .apply("WordStream", Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)))
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Window.<String>into(windowFn)
- .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
- .discardingFiredPanes())
- .apply(ComputeTopCompletions.top(10, options.getRecursive()));
-
- toWrite
- .apply("FormatForPerTaskFile", ParDo.of(new FormatForPerTaskLocalFile()))
- .apply(TextIO.Write.to("./outputAutoComplete.txt"));
-
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
deleted file mode 100644
index 8fefc9f..0000000
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.examples.streaming;
-
-import org.apache.beam.runners.flink.FlinkRunner;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Duration;
-
-/**
- * To run the example, first open two sockets on two terminals by executing the commands:
- * <ul>
- * <li><code>nc -lk 9999</code>, and
- * <li><code>nc -lk 9998</code>
- * </ul>
- * and then launch the example. Now whatever you type in the terminal is going to be
- * the input to the program.
- * */
-public class JoinExamples {
-
- static PCollection<String> joinEvents(PCollection<String> streamA,
- PCollection<String> streamB) throws Exception {
-
- final TupleTag<String> firstInfoTag = new TupleTag<>();
- final TupleTag<String> secondInfoTag = new TupleTag<>();
-
- // transform both input collections to tuple collections, where the keys are country
- // codes in both cases.
- PCollection<KV<String, String>> firstInfo = streamA.apply(
- ParDo.of(new ExtractEventDataFn()));
- PCollection<KV<String, String>> secondInfo = streamB.apply(
- ParDo.of(new ExtractEventDataFn()));
-
- // country code 'key' -> CGBKR (<event info>, <country name>)
- PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
- .of(firstInfoTag, firstInfo)
- .and(secondInfoTag, secondInfo)
- .apply(CoGroupByKey.<String>create());
-
- // Process the CoGbkResult elements generated by the CoGroupByKey transform.
- // country code 'key' -> string of <event info>, <country name>
- PCollection<KV<String, String>> finalResultCollection =
- kvpCollection.apply("Process", ParDo.of(
- new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
- private static final long serialVersionUID = 0;
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- KV<String, CoGbkResult> e = c.element();
- String key = e.getKey();
-
- String defaultA = "NO_VALUE";
-
- // the following getOnly is a bit tricky because it expects to have
- // EXACTLY ONE value in the corresponding stream and for the corresponding key.
-
- String lineA = e.getValue().getOnly(firstInfoTag, defaultA);
- for (String lineB : c.element().getValue().getAll(secondInfoTag)) {
- // Generate a string that combines information from both collection values
- c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + lineB));
- }
- }
- }));
-
- return finalResultCollection
- .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() {
- private static final long serialVersionUID = 0;
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- String result = c.element().getKey() + " -> " + c.element().getValue();
- System.out.println(result);
- c.output(result);
- }
- }));
- }
-
- static class ExtractEventDataFn extends DoFn<String, KV<String, String>> {
- private static final long serialVersionUID = 0;
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- String line = c.element().toLowerCase();
- String key = line.split("\\s")[0];
- c.output(KV.of(key, line));
- }
- }
-
- private interface Options extends WindowedWordCount.StreamingWordCountOptions {
-
- }
-
- public static void main(String[] args) throws Exception {
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
- options.setStreaming(true);
- options.setCheckpointingInterval(1000L);
- options.setNumberOfExecutionRetries(5);
- options.setExecutionRetryDelay(3000L);
- options.setRunner(FlinkRunner.class);
-
- WindowFn<Object, ?> windowFn = FixedWindows.of(
- Duration.standardSeconds(options.getWindowSize()));
-
- Pipeline p = Pipeline.create(options);
-
- // the following two 'applys' create multiple inputs to our pipeline, one for each
- // of our two input sources.
- PCollection<String> streamA = p
- .apply("FirstStream", Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)))
- .apply(Window.<String>into(windowFn)
- .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
- .discardingFiredPanes());
- PCollection<String> streamB = p
- .apply("SecondStream", Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)))
- .apply(Window.<String>into(windowFn)
- .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
- .discardingFiredPanes());
-
- PCollection<String> formattedResults = joinEvents(streamA, streamB);
- formattedResults.apply(TextIO.Write.to("./outputJoin.txt"));
- p.run();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
deleted file mode 100644
index 792c214..0000000
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.examples.streaming;
-
-import java.io.IOException;
-import org.apache.beam.runners.flink.FlinkRunner;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * To run the example, first open a socket on a terminal by executing the command:
- * <ul>
- * <li><code>nc -lk 9999</code>
- * </ul>
- * and then launch the example. Now whatever you type in the terminal is going to be
- * the input to the program.
- * */
-public class WindowedWordCount {
-
- private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class);
-
- static final long WINDOW_SIZE = 10; // Default window duration in seconds
- static final long SLIDE_SIZE = 5; // Default window slide in seconds
-
- static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- String row = c.element().getKey() + " - " + c.element().getValue() + " @ "
- + c.timestamp().toString();
- c.output(row);
- }
- }
-
- static class ExtractWordsFn extends DoFn<String, String> {
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", Sum.ofLongs());
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- if (c.element().trim().isEmpty()) {
- emptyLines.addValue(1L);
- }
-
- // Split the line into words.
- String[] words = c.element().split("[^a-zA-Z']+");
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- /**
- * Pipeline options.
- */
- public interface StreamingWordCountOptions
- extends org.apache.beam.runners.flink.examples.WordCount.Options {
- @Description("Sliding window duration, in seconds")
- @Default.Long(WINDOW_SIZE)
- Long getWindowSize();
-
- void setWindowSize(Long value);
-
- @Description("Window slide, in seconds")
- @Default.Long(SLIDE_SIZE)
- Long getSlide();
-
- void setSlide(Long value);
- }
-
- public static void main(String[] args) throws IOException {
- StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
- .as(StreamingWordCountOptions.class);
- options.setStreaming(true);
- options.setWindowSize(10L);
- options.setSlide(5L);
- options.setCheckpointingInterval(1000L);
- options.setNumberOfExecutionRetries(5);
- options.setExecutionRetryDelay(3000L);
- options.setRunner(FlinkRunner.class);
-
- LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize()
- + " sec. and a slide of " + options.getSlide());
-
- Pipeline pipeline = Pipeline.create(options);
-
- PCollection<String> words = pipeline
- .apply("StreamingWordCount",
- Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)))
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Window.<String>into(SlidingWindows.of(
- Duration.standardSeconds(options.getWindowSize()))
- .every(Duration.standardSeconds(options.getSlide())))
- .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
- .discardingFiredPanes());
-
- PCollection<KV<String, Long>> wordCounts =
- words.apply(Count.<String>perElement());
-
- wordCounts.apply(ParDo.of(new FormatAsStringFn()))
- .apply(TextIO.Write.to("./outputWordCount.txt"));
-
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java
deleted file mode 100644
index 58f41b6..0000000
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Flink Beam runner exemple.
- */
-package org.apache.beam.runners.flink.examples.streaming;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index a5c5ea0..351035e 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -26,22 +26,97 @@
<relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>beam-runners-flink-parent</artifactId>
+ <artifactId>beam-runners-flink</artifactId>
<name>Apache Beam :: Runners :: Flink</name>
-
- <packaging>pom</packaging>
-
- <modules>
- <module>runner</module>
- <module>examples</module>
- </modules>
+ <packaging>jar</packaging>
<properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<flink.version>1.2.0</flink.version>
</properties>
+ <profiles>
+ <profile>
+ <id>local-validates-runner-tests</id>
+ <activation><activeByDefault>false</activeByDefault></activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <executions>
+
+ <!-- This configures the inherited validates-runner-tests
+ execution to execute with a local Flink instance. -->
+ <execution>
+ <id>validates-runner-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
+ <excludedGroups>
+ org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
+ org.apache.beam.sdk.testing.UsesSplittableParDo,
+ org.apache.beam.sdk.testing.UsesAttemptedMetrics,
+ org.apache.beam.sdk.testing.UsesCommittedMetrics,
+ org.apache.beam.sdk.testing.UsesTestStream
+ </excludedGroups>
+ <parallel>none</parallel>
+ <failIfNoTests>true</failIfNoTests>
+ <dependenciesToScan>
+ <dependency>org.apache.beam:beam-sdks-java-core</dependency>
+ </dependenciesToScan>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions>
+ [
+ "--runner=TestFlinkRunner",
+ "--streaming=false"
+ ]
+ </beamTestPipelineOptions>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
+
+ <!-- This second execution runs the tests in streaming mode -->
+ <execution>
+ <id>streaming-validates-runner-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
+ <excludedGroups>
+ org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
+ org.apache.beam.sdk.testing.UsesSetState,
+ org.apache.beam.sdk.testing.UsesMapState,
+ org.apache.beam.sdk.testing.UsesAttemptedMetrics,
+ org.apache.beam.sdk.testing.UsesCommittedMetrics,
+ org.apache.beam.sdk.testing.UsesTestStream
+ </excludedGroups>
+ <parallel>none</parallel>
+ <failIfNoTests>true</failIfNoTests>
+ <dependenciesToScan>
+ <dependency>org.apache.beam:beam-sdks-java-core</dependency>
+ </dependenciesToScan>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions>
+ [
+ "--runner=TestFlinkRunner",
+ "--streaming=true"
+ ]
+ </beamTestPipelineOptions>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
<build>
<pluginManagement>
<plugins>
@@ -89,19 +164,103 @@
<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.10</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.10</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
+ <!-- For testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.10</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
</dependency>
+ <!-- Beam -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-java</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-construction-java</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
</dependency>
<dependency>
@@ -113,5 +272,101 @@
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
+
+ <!--
+ Force an upgrade on the version of Apache Commons from Flink to support DEFLATE compression.
+ -->
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+
+ <!-- Test scoped -->
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Depend on test jar to scan for ValidatesRunner tests -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-bigquery</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_2.10</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>apacheds-jdbm1</artifactId>
+ <groupId>org.apache.directory.jdbm</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Optional Pipeline Registration -->
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <!-- transitive test dependencies from beam-sdk-java-core -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-common-fn-api</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>