You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/09 18:49:24 UTC

[1/3] beam git commit: Shade Dependencies of the DirectRunner

Repository: beam
Updated Branches:
  refs/heads/master 28180c45b -> 72b361e9a


Shade Dependencies of the DirectRunner

Add an API Surface Test


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0a32614
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0a32614
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0a32614

Branch: refs/heads/master
Commit: b0a326148e537f4c8a5de59f6a5ffdc1ccabea05
Parents: f03c04a
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 9 09:47:47 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 9 11:49:14 2017 -0700

----------------------------------------------------------------------
 runners/direct-java/pom.xml                     | 79 ++++++++++++++++++--
 .../GroupAlsoByWindowEvaluatorFactory.java      |  6 +-
 .../direct/DirectRunnerApiSurfaceTest.java      | 55 ++++++++++++++
 3 files changed, 133 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b0a32614/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index ba3cd3e..19ee81a 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -77,6 +77,76 @@
         </executions>
       </plugin>
 
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>bundle-and-repackage</id>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <artifactSet>
+                <includes>
+                  <include>com.google.guava:guava</include>
+                  <include>com.google.protobuf:protobuf-java</include>
+                  <include>org.apache.beam:beam-runners-core-construction-java</include>
+                  <include>org.apache.beam:beam-runners-core-java</include>
+                  <include>org.apache.beam:beam-sdks-common-runner-api</include>
+                </includes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+              <relocations>
+                <relocation>
+                  <pattern>org.apache.beam.runners.core</pattern>
+                  <shadedPattern>
+                    org.apache.beam.runners.direct.repackaged.runners.core
+                  </shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.beam.sdk.common</pattern>
+                  <shadedPattern>
+                    org.apache.beam.runners.direct.repackaged.sdk.common
+                  </shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.google.common</pattern>
+                  <shadedPattern>
+                    org.apache.beam.runners.direct.repackaged.com.google.common
+                  </shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.google.protobuf</pattern>
+                  <shadedPattern>
+                    org.apache.beam.runners.direct.repackaged.com.google.protobuf
+                  </shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.google.thirdparty</pattern>
+                  <shadedPattern>
+                    org.apache.beam.runners.direct.repackaged.com.google.thirdparty
+                  </shadedPattern>
+                </relocation>
+              </relocations>
+              <transformers>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
       <!-- Coverage analysis for unit tests. -->
       <plugin>
         <groupId>org.jacoco</groupId>
@@ -93,18 +163,17 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-core-construction-java</artifactId>
+      <artifactId>beam-sdks-common-runner-api</artifactId>
     </dependency>
 
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-core-java</artifactId>
+      <artifactId>beam-runners-core-construction-java</artifactId>
     </dependency>
 
     <dependency>
-      <groupId>com.google.http-client</groupId>
-      <artifactId>google-http-client-protobuf</artifactId>
-      <scope>runtime</scope>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-java</artifactId>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/b0a32614/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 41c797f..1d079d9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -40,6 +40,7 @@ import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -161,13 +162,14 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
       CopyOnAccessInMemoryStateInternals stateInternals =
           (CopyOnAccessInMemoryStateInternals) stepContext.stateInternals();
       DirectTimerInternals timerInternals = stepContext.timerInternals();
+      RunnerApi.Trigger runnerApiTrigger =
+          Triggers.toProto(windowingStrategy.getTrigger());
       ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner =
           new ReduceFnRunner<>(
               key,
               windowingStrategy,
               ExecutableTriggerStateMachine.create(
-                  TriggerStateMachines.stateMachineForTrigger(
-                      Triggers.toProto(windowingStrategy.getTrigger()))),
+                  TriggerStateMachines.stateMachineForTrigger(runnerApiTrigger)),
               stateInternals,
               timerInternals,
               new OutputWindowedValueToBundle<>(bundle),

http://git-wip-us.apache.org/repos/asf/beam/blob/b0a32614/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
new file mode 100644
index 0000000..9928cb0
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.ApiSurface;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** API surface verification for {@link org.apache.beam.runners.direct}. */
+@RunWith(JUnit4.class)
+public class DirectRunnerApiSurfaceTest {
+  @Test
+  public void testDirectRunnerApiSurface() throws Exception {
+    // The DirectRunner can expose the Core SDK, anything exposed by the Core SDK, and itself
+    @SuppressWarnings("unchecked")
+    final Set<String> allowed =
+        ImmutableSet.of("org.apache.beam.sdk", "org.apache.beam.runners.direct", "org.joda.time");
+
+    final Package thisPackage = getClass().getPackage();
+    final ClassLoader thisClassLoader = getClass().getClassLoader();
+    ApiSurface apiSurface =
+        ApiSurface.ofPackage(thisPackage, thisClassLoader)
+            .pruningPattern("org[.]apache[.]beam[.].*Test.*")
+            .pruningPattern("org[.]apache[.]beam[.].*IT")
+            .pruningClass(Nullable.class)
+            .pruningPattern("java[.]io.*")
+            .pruningPattern("java[.]lang.*")
+            .pruningPattern("java[.]util.*");
+
+    assertThat(apiSurface, containsOnlyPackages(allowed));
+  }
+}


[2/3] beam git commit: Handle Errors in the DirectRunner

Posted by dh...@apache.org.
Handle Errors in the DirectRunner

When a worker dies because of an error, propagate that error and fail
the Pipeline.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f03c04a7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f03c04a7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f03c04a7

Branch: refs/heads/master
Commit: f03c04a787343c3710355c84a105582cdc815469
Parents: 28180c4
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 9 09:46:38 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 9 11:49:14 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/CompletionCallback.java |  6 +++++
 .../direct/ExecutorServiceParallelExecutor.java | 27 +++++++++++++++-----
 .../beam/runners/direct/TransformExecutor.java  | 12 +++++++--
 .../apache/beam/runners/direct/MockClock.java   |  2 +-
 .../runners/direct/TransformExecutorTest.java   |  5 ++++
 5 files changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
index 0af22c8..417fa09 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
@@ -40,4 +40,10 @@ interface CompletionCallback {
    * Handle a result that terminated abnormally due to the provided {@link Exception}.
    */
   void handleException(CommittedBundle<?> inputBundle, Exception t);
+
+  /**
+   * Handle a result that terminated abnormally due to the provided {@link Error}. The pipeline
+   * should be shut down, and the Error propagated.
+  */
+  void handleError(Error err);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index b7f4732..71ab4cc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -285,8 +285,15 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
         // there are no updates to process and no updates will ever be published because the
         // executor is shutdown
         return pipelineState.get();
-      } else if (update != null && update.exception.isPresent()) {
-        throw update.exception.get();
+      } else if (update != null && update.thrown.isPresent()) {
+        Throwable thrown = update.thrown.get();
+        if (thrown instanceof Exception) {
+          throw (Exception) thrown;
+        } else if (thrown instanceof Error) {
+          throw (Error) thrown;
+        } else {
+          throw new Exception("Unknown Type of Throwable", thrown);
+        }
       }
     }
     return pipelineState.get();
@@ -380,6 +387,11 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
       allUpdates.offer(ExecutorUpdate.fromException(e));
       outstandingWork.decrementAndGet();
     }
+
+    @Override
+    public void handleError(Error err) {
+      visibleUpdates.add(VisibleExecutorUpdate.fromError(err));
+    }
   }
 
   /**
@@ -424,7 +436,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
    * return normally or throw an exception.
    */
   private static class VisibleExecutorUpdate {
-    private final Optional<? extends Exception> exception;
+    private final Optional<? extends Throwable> thrown;
     @Nullable
     private final State newState;
 
@@ -432,6 +444,10 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
       return new VisibleExecutorUpdate(null, e);
     }
 
+    public static VisibleExecutorUpdate fromError(Error err) {
+      return new VisibleExecutorUpdate(State.FAILED, err);
+    }
+
     public static VisibleExecutorUpdate finished() {
       return new VisibleExecutorUpdate(State.DONE, null);
     }
@@ -440,15 +456,14 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
       return new VisibleExecutorUpdate(State.CANCELLED, null);
     }
 
-    private VisibleExecutorUpdate(State newState, @Nullable Exception exception) {
-      this.exception = Optional.fromNullable(exception);
+    private VisibleExecutorUpdate(State newState, @Nullable Throwable exception) {
+      this.thrown = Optional.fromNullable(exception);
       this.newState = newState;
     }
 
     public State getNewState() {
       return newState;
     }
-
   }
 
   private class MonitorRunnable implements Runnable {

http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index 8e1515b..56f8650 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -26,6 +26,8 @@ import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A {@link Callable} responsible for constructing a {@link TransformEvaluator} from a
@@ -36,6 +38,8 @@ import org.apache.beam.sdk.util.WindowedValue;
  * that it is being executed on.
  */
 class TransformExecutor<T> implements Runnable {
+  private static final Logger LOG = LoggerFactory.getLogger(TransformExecutor.class);
+
   public static <T> TransformExecutor<T> create(
       EvaluationContext context,
       TransformEvaluatorFactory factory,
@@ -112,6 +116,10 @@ class TransformExecutor<T> implements Runnable {
         throw (RuntimeException) e;
       }
       throw new RuntimeException(e);
+    } catch (Error err) {
+      LOG.error("Error occurred within {}", this, err);
+      onComplete.handleError(err);
+      throw err;
     } finally {
       // Report the physical metrics from the end of this step.
       context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative());
@@ -162,8 +170,8 @@ class TransformExecutor<T> implements Runnable {
       TransformEvaluator<T> evaluator, MetricsContainer metricsContainer,
       Collection<ModelEnforcement<T>> enforcements)
       throws Exception {
-    TransformResult<T> result = evaluator.finishBundle()
-        .withLogicalMetricUpdates(metricsContainer.getCumulative());
+    TransformResult<T> result =
+        evaluator.finishBundle().withLogicalMetricUpdates(metricsContainer.getCumulative());
     CommittedResult outputs = onComplete.handleResult(inputBundle, result);
     for (ModelEnforcement<T> enforcement : enforcements) {
       enforcement.afterFinish(inputBundle, result, outputs.getOutputs());

http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
index 11ecbff..9275e3c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
@@ -28,7 +28,7 @@ import org.joda.time.Instant;
  *
  * <p>For uses of the {@link Clock} interface in unit tests.
  */
-public class MockClock implements Clock {
+class MockClock implements Clock {
 
   private Instant now;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index dc0ef7c..86412a0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -434,6 +434,11 @@ public class TransformExecutorTest {
       handledException = e;
       onMethod.countDown();
     }
+
+    @Override
+    public void handleError(Error err) {
+      throw err;
+    }
   }
 
   private static class TestEnforcementFactory implements ModelEnforcementFactory {


[3/3] beam git commit: This closes #2714

Posted by dh...@apache.org.
This closes #2714


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/72b361e9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/72b361e9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/72b361e9

Branch: refs/heads/master
Commit: 72b361e9af1a21bc965408da392a2490e5985f0c
Parents: 28180c4 b0a3261
Author: Dan Halperin <dh...@google.com>
Authored: Tue May 9 11:49:17 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 9 11:49:17 2017 -0700

----------------------------------------------------------------------
 runners/direct-java/pom.xml                     | 79 ++++++++++++++++++--
 .../beam/runners/direct/CompletionCallback.java |  6 ++
 .../direct/ExecutorServiceParallelExecutor.java | 27 +++++--
 .../GroupAlsoByWindowEvaluatorFactory.java      |  6 +-
 .../beam/runners/direct/TransformExecutor.java  | 12 ++-
 .../direct/DirectRunnerApiSurfaceTest.java      | 55 ++++++++++++++
 .../apache/beam/runners/direct/MockClock.java   |  2 +-
 .../runners/direct/TransformExecutorTest.java   |  5 ++
 8 files changed, 176 insertions(+), 16 deletions(-)
----------------------------------------------------------------------