You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/10/18 15:05:05 UTC

[1/2] incubator-beam git commit: [BEAM-593] avoid throwing Exception in waitUntilFinish

Repository: incubator-beam
Updated Branches:
  refs/heads/master a2c342cfd -> 6b5ff4c4a


[BEAM-593] avoid throwing Exception in waitUntilFinish

The current implementation of Flink's PipelineResult assumes that the
pipeline has already been processed. Hence, we can return State.Done
when wailUntilFinished is called.

Additionally, we introduce a PipelineResult for detached execution which
returns State.UNKNOWN for now.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/76434dff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/76434dff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/76434dff

Branch: refs/heads/master
Commit: 76434dff650196c74afdeac917d8ceddb2550079
Parents: a2c342c
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Oct 13 14:01:06 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Oct 18 16:59:49 2016 +0200

----------------------------------------------------------------------
 .../flink/FlinkDetachedRunnerResult.java        | 76 ++++++++++++++++++++
 .../apache/beam/runners/flink/FlinkRunner.java  |  9 ++-
 .../beam/runners/flink/FlinkRunnerResult.java   | 11 +--
 .../beam/runners/flink/TestFlinkRunner.java     |  9 ++-
 4 files changed, 91 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76434dff/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
new file mode 100644
index 0000000..6adcf07
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.io.IOException;
+
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.commons.lang.NotImplementedException;
+import org.joda.time.Duration;
+
+
+/**
+ * Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline} with Flink.
+ * In detached execution, results and job execution are currently unavailable.
+ */
+public class FlinkDetachedRunnerResult implements PipelineResult {
+
+  FlinkDetachedRunnerResult() {}
+
+  @Override
+  public State getState() {
+    return State.UNKNOWN;
+  }
+
+  @Override
+  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
+      throws AggregatorRetrievalException {
+    throw new AggregatorRetrievalException(
+        "Accumulators can't be retrieved for detached Job executions.",
+        new NotImplementedException());
+  }
+
+  @Override
+  public MetricResults metrics() {
+    throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
+  }
+
+  @Override
+  public State cancel() throws IOException {
+    throw new UnsupportedOperationException("Cancelling is not yet supported.");
+  }
+
+  @Override
+  public State waitUntilFinish() {
+    return State.UNKNOWN;
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+    return State.UNKNOWN;
+  }
+
+  @Override
+  public String toString() {
+    return "FlinkDetachedRunnerResult{}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76434dff/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 932952d..12e21c7 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -25,7 +25,6 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -34,6 +33,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -66,7 +66,7 @@ import org.slf4j.LoggerFactory;
  * pipeline by first translating them to a Flink Plan and then executing them either locally
  * or on a Flink cluster, depending on the configuration.
  */
-public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
+public class FlinkRunner extends PipelineRunner<PipelineResult> {
 
   private static final Logger LOG = LoggerFactory.getLogger(FlinkRunner.class);
 
@@ -133,7 +133,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
   }
 
   @Override
-  public FlinkRunnerResult run(Pipeline pipeline) {
+  public PipelineResult run(Pipeline pipeline) {
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
 
     LOG.info("Executing pipeline using FlinkRunner.");
@@ -154,8 +154,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
 
     if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) {
       LOG.info("Pipeline submitted in Detached mode");
-      Map<String, Object> accumulators = Collections.emptyMap();
-      return new FlinkRunnerResult(accumulators, -1L);
+      return new FlinkDetachedRunnerResult();
     } else {
       LOG.info("Execution finished in {} msecs", result.getNetRuntime());
       Map<String, Object> accumulators = result.getAllAccumulatorResults();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76434dff/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index 6b15485..0682b56 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -33,9 +33,12 @@ import org.joda.time.Duration;
  * {@link org.apache.beam.sdk.transforms.Aggregator}s.
  */
 public class FlinkRunnerResult implements PipelineResult {
+
   private final Map<String, Object> aggregators;
+
   private final long runtime;
-  public FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
+
+  FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
     this.aggregators = (aggregators == null || aggregators.isEmpty())
         ? Collections.<String, Object>emptyMap()
         : Collections.unmodifiableMap(aggregators);
@@ -44,7 +47,7 @@ public class FlinkRunnerResult implements PipelineResult {
 
   @Override
   public State getState() {
-    return null;
+    return State.DONE;
   }
 
   @Override
@@ -80,12 +83,12 @@ public class FlinkRunnerResult implements PipelineResult {
 
   @Override
   public State waitUntilFinish() {
-    return waitUntilFinish(Duration.millis(-1));
+    return State.DONE;
   }
 
   @Override
   public State waitUntilFinish(Duration duration) {
-    throw new UnsupportedOperationException("FlinkRunnerResult does not support waitUntilFinish.");
+    return State.DONE;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76434dff/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
index 67a7d38..7ba98ab 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.flink;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
@@ -29,7 +30,7 @@ import org.apache.beam.sdk.values.POutput;
 /**
  * Test Flink runner.
  */
-public class TestFlinkRunner extends PipelineRunner<FlinkRunnerResult> {
+public class TestFlinkRunner extends PipelineRunner<PipelineResult> {
 
   private FlinkRunner delegate;
 
@@ -59,11 +60,9 @@ public class TestFlinkRunner extends PipelineRunner<FlinkRunnerResult> {
   }
 
   @Override
-  public FlinkRunnerResult run(Pipeline pipeline) {
+  public PipelineResult run(Pipeline pipeline) {
     try {
-      FlinkRunnerResult result = delegate.run(pipeline);
-
-      return result;
+      return delegate.run(pipeline);
     } catch (Throwable e) {
       // Special case hack to pull out assertion errors from PAssert; instead there should
       // probably be a better story along the lines of UserCodeException.


[2/2] incubator-beam git commit: This closes #1093

Posted by mx...@apache.org.
This closes #1093


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6b5ff4c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6b5ff4c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6b5ff4c4

Branch: refs/heads/master
Commit: 6b5ff4c4aad5e4d1419b1a147153b0f8d72324ae
Parents: a2c342c 76434df
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Oct 18 16:59:58 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Oct 18 16:59:58 2016 +0200

----------------------------------------------------------------------
 .../flink/FlinkDetachedRunnerResult.java        | 76 ++++++++++++++++++++
 .../apache/beam/runners/flink/FlinkRunner.java  |  9 ++-
 .../beam/runners/flink/FlinkRunnerResult.java   | 11 +--
 .../beam/runners/flink/TestFlinkRunner.java     |  9 ++-
 4 files changed, 91 insertions(+), 14 deletions(-)
----------------------------------------------------------------------