You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/05/12 08:59:30 UTC
[1/2] incubator-beam git commit: [BEAM-272][flink] remove dependency
on Dataflow Runner
Repository: incubator-beam
Updated Branches:
refs/heads/master 6ec9e9680 -> 123674f4b
[BEAM-272][flink] remove dependency on Dataflow Runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/50edd231
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/50edd231
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/50edd231
Branch: refs/heads/master
Commit: 50edd2314e7c7b97b75d2e6759c5857f4f67a662
Parents: acb0406
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed May 11 11:57:44 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu May 12 10:52:40 2016 +0200
----------------------------------------------------------------------
runners/flink/runner/pom.xml | 10 -------
.../runners/flink/FlinkPipelineOptions.java | 30 ++++++++++++++++++--
.../beam/runners/flink/FlinkPipelineRunner.java | 4 +--
3 files changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/50edd231/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index a1d5370..8958bdd 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -110,16 +110,6 @@
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>google-cloud-dataflow-java-runner</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<!-- Test scoped -->
<dependency>
<groupId>org.apache.beam</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/50edd231/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index fd86bc9..c40473e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -18,14 +18,18 @@
package org.apache.beam.runners.flink;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
import java.util.List;
@@ -50,9 +54,9 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
/**
* The job name is used to identify jobs running on a Flink cluster.
*/
- @Description("Dataflow job name, to uniquely identify active jobs. "
+ @Description("Flink job name, to uniquely identify active jobs. "
+ "Defaults to using the ApplicationName-UserName-Date.")
- @Default.InstanceFactory(DataflowPipelineOptions.JobNameFactory.class)
+ @Default.InstanceFactory(JobNameFactory.class)
String getJobName();
void setJobName(String value);
@@ -91,4 +95,24 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
@Default.Long(-1L)
Long getExecutionRetryDelay();
void setExecutionRetryDelay(Long delay);
+
+
+ class JobNameFactory implements DefaultValueFactory<String> {
+ private static final DateTimeFormatter FORMATTER =
+ DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
+
+ @Override
+ public String create(PipelineOptions options) {
+ String appName = options.as(ApplicationNameOptions.class).getAppName();
+ String normalizedAppName = appName == null || appName.length() == 0 ? "FlinkRunner"
+ : appName.toLowerCase()
+ .replaceAll("[^a-z0-9]", "0")
+ .replaceAll("^[^a-z]", "a");
+ String userName = System.getProperty("user.name", "");
+ String normalizedUserName = userName.toLowerCase()
+ .replaceAll("[^a-z0-9]", "0");
+ String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
+ return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/50edd231/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index a389d7a..3edf6f3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.flink;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -46,7 +45,6 @@ import java.util.Map;
* pipeline by first translating them to a Flink Plan and then executing them either locally
* or on a Flink cluster, depending on the configuration.
* <p>
- * This is based on {@link org.apache.beam.runners.dataflow.DataflowPipelineRunner}.
*/
public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
@@ -80,7 +78,7 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
if (flinkOptions.getFilesToStage() == null) {
flinkOptions.setFilesToStage(detectClassPathResourcesToStage(
- DataflowPipelineRunner.class.getClassLoader()));
+ FlinkPipelineRunner.class.getClassLoader()));
LOG.info("PipelineOptions.filesToStage was not specified. "
+ "Defaulting to files from the classpath: will stage {} files. "
+ "Enable logging at DEBUG level to see which files will be staged.",
[2/2] incubator-beam git commit: This closes #324
Posted by mx...@apache.org.
This closes #324
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/123674f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/123674f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/123674f4
Branch: refs/heads/master
Commit: 123674f4b5a823cc593514a131943fbcc462ab7a
Parents: 6ec9e96 50edd23
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu May 12 10:57:33 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu May 12 10:57:33 2016 +0200
----------------------------------------------------------------------
runners/flink/runner/pom.xml | 10 -------
.../runners/flink/FlinkPipelineOptions.java | 30 ++++++++++++++++++--
.../beam/runners/flink/FlinkPipelineRunner.java | 4 +--
3 files changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/123674f4/runners/flink/runner/pom.xml
----------------------------------------------------------------------