You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/09 18:49:24 UTC
[1/3] beam git commit: Shade Dependencies of the DirectRunner
Repository: beam
Updated Branches:
refs/heads/master 28180c45b -> 72b361e9a
Shade Dependencies of the DirectRunner
Add an API Surface Test
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0a32614
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0a32614
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0a32614
Branch: refs/heads/master
Commit: b0a326148e537f4c8a5de59f6a5ffdc1ccabea05
Parents: f03c04a
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 9 09:47:47 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 9 11:49:14 2017 -0700
----------------------------------------------------------------------
runners/direct-java/pom.xml | 79 ++++++++++++++++++--
.../GroupAlsoByWindowEvaluatorFactory.java | 6 +-
.../direct/DirectRunnerApiSurfaceTest.java | 55 ++++++++++++++
3 files changed, 133 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b0a32614/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index ba3cd3e..19ee81a 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -77,6 +77,76 @@
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>bundle-and-repackage</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes>
+ <include>com.google.guava:guava</include>
+ <include>com.google.protobuf:protobuf-java</include>
+ <include>org.apache.beam:beam-runners-core-construction-java</include>
+ <include>org.apache.beam:beam-runners-core-java</include>
+ <include>org.apache.beam:beam-sdks-common-runner-api</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.beam.runners.core</pattern>
+ <shadedPattern>
+ org.apache.beam.runners.direct.repackaged.runners.core
+ </shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.beam.sdk.common</pattern>
+ <shadedPattern>
+ org.apache.beam.runners.direct.repackaged.sdk.common
+ </shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>
+ org.apache.beam.runners.direct.repackaged.com.google.common
+ </shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.protobuf</pattern>
+ <shadedPattern>
+ org.apache.beam.runners.direct.repackaged.com.google.protobuf
+ </shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.thirdparty</pattern>
+ <shadedPattern>
+ org.apache.beam.runners.direct.repackaged.com.google.thirdparty
+ </shadedPattern>
+ </relocation>
+ </relocations>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
<!-- Coverage analysis for unit tests. -->
<plugin>
<groupId>org.jacoco</groupId>
@@ -93,18 +163,17 @@
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-core-construction-java</artifactId>
+ <artifactId>beam-sdks-common-runner-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-core-java</artifactId>
+ <artifactId>beam-runners-core-construction-java</artifactId>
</dependency>
<dependency>
- <groupId>com.google.http-client</groupId>
- <artifactId>google-http-client-protobuf</artifactId>
- <scope>runtime</scope>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-java</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/b0a32614/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 41c797f..1d079d9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -40,6 +40,7 @@ import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -161,13 +162,14 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
CopyOnAccessInMemoryStateInternals stateInternals =
(CopyOnAccessInMemoryStateInternals) stepContext.stateInternals();
DirectTimerInternals timerInternals = stepContext.timerInternals();
+ RunnerApi.Trigger runnerApiTrigger =
+ Triggers.toProto(windowingStrategy.getTrigger());
ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner =
new ReduceFnRunner<>(
key,
windowingStrategy,
ExecutableTriggerStateMachine.create(
- TriggerStateMachines.stateMachineForTrigger(
- Triggers.toProto(windowingStrategy.getTrigger()))),
+ TriggerStateMachines.stateMachineForTrigger(runnerApiTrigger)),
stateInternals,
timerInternals,
new OutputWindowedValueToBundle<>(bundle),
http://git-wip-us.apache.org/repos/asf/beam/blob/b0a32614/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
new file mode 100644
index 0000000..9928cb0
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.ApiSurface;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** API surface verification for {@link org.apache.beam.runners.direct}. */
+@RunWith(JUnit4.class)
+public class DirectRunnerApiSurfaceTest {
+ @Test
+ public void testDirectRunnerApiSurface() throws Exception {
+ // The DirectRunner can expose the Core SDK, anything exposed by the Core SDK, and itself
+ @SuppressWarnings("unchecked")
+ final Set<String> allowed =
+ ImmutableSet.of("org.apache.beam.sdk", "org.apache.beam.runners.direct", "org.joda.time");
+
+ final Package thisPackage = getClass().getPackage();
+ final ClassLoader thisClassLoader = getClass().getClassLoader();
+ ApiSurface apiSurface =
+ ApiSurface.ofPackage(thisPackage, thisClassLoader)
+ .pruningPattern("org[.]apache[.]beam[.].*Test.*")
+ .pruningPattern("org[.]apache[.]beam[.].*IT")
+ .pruningClass(Nullable.class)
+ .pruningPattern("java[.]io.*")
+ .pruningPattern("java[.]lang.*")
+ .pruningPattern("java[.]util.*");
+
+ assertThat(apiSurface, containsOnlyPackages(allowed));
+ }
+}
[2/3] beam git commit: Handle Errors in the DirectRunner
Posted by dh...@apache.org.
Handle Errors in the DirectRunner
When a worker dies because of an error, propagate that error and fail
the Pipeline.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f03c04a7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f03c04a7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f03c04a7
Branch: refs/heads/master
Commit: f03c04a787343c3710355c84a105582cdc815469
Parents: 28180c4
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 9 09:46:38 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 9 11:49:14 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/CompletionCallback.java | 6 +++++
.../direct/ExecutorServiceParallelExecutor.java | 27 +++++++++++++++-----
.../beam/runners/direct/TransformExecutor.java | 12 +++++++--
.../apache/beam/runners/direct/MockClock.java | 2 +-
.../runners/direct/TransformExecutorTest.java | 5 ++++
5 files changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
index 0af22c8..417fa09 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
@@ -40,4 +40,10 @@ interface CompletionCallback {
* Handle a result that terminated abnormally due to the provided {@link Exception}.
*/
void handleException(CommittedBundle<?> inputBundle, Exception t);
+
+ /**
+ * Handle a result that terminated abnormally due to the provided {@link Error}. The pipeline
+ * should be shut down, and the Error propagated.
+ */
+ void handleError(Error err);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index b7f4732..71ab4cc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -285,8 +285,15 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
// there are no updates to process and no updates will ever be published because the
// executor is shutdown
return pipelineState.get();
- } else if (update != null && update.exception.isPresent()) {
- throw update.exception.get();
+ } else if (update != null && update.thrown.isPresent()) {
+ Throwable thrown = update.thrown.get();
+ if (thrown instanceof Exception) {
+ throw (Exception) thrown;
+ } else if (thrown instanceof Error) {
+ throw (Error) thrown;
+ } else {
+ throw new Exception("Unknown Type of Throwable", thrown);
+ }
}
}
return pipelineState.get();
@@ -380,6 +387,11 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
allUpdates.offer(ExecutorUpdate.fromException(e));
outstandingWork.decrementAndGet();
}
+
+ @Override
+ public void handleError(Error err) {
+ visibleUpdates.add(VisibleExecutorUpdate.fromError(err));
+ }
}
/**
@@ -424,7 +436,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
* return normally or throw an exception.
*/
private static class VisibleExecutorUpdate {
- private final Optional<? extends Exception> exception;
+ private final Optional<? extends Throwable> thrown;
@Nullable
private final State newState;
@@ -432,6 +444,10 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
return new VisibleExecutorUpdate(null, e);
}
+ public static VisibleExecutorUpdate fromError(Error err) {
+ return new VisibleExecutorUpdate(State.FAILED, err);
+ }
+
public static VisibleExecutorUpdate finished() {
return new VisibleExecutorUpdate(State.DONE, null);
}
@@ -440,15 +456,14 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
return new VisibleExecutorUpdate(State.CANCELLED, null);
}
- private VisibleExecutorUpdate(State newState, @Nullable Exception exception) {
- this.exception = Optional.fromNullable(exception);
+ private VisibleExecutorUpdate(State newState, @Nullable Throwable exception) {
+ this.thrown = Optional.fromNullable(exception);
this.newState = newState;
}
public State getNewState() {
return newState;
}
-
}
private class MonitorRunnable implements Runnable {
http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index 8e1515b..56f8650 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -26,6 +26,8 @@ import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A {@link Callable} responsible for constructing a {@link TransformEvaluator} from a
@@ -36,6 +38,8 @@ import org.apache.beam.sdk.util.WindowedValue;
* that it is being executed on.
*/
class TransformExecutor<T> implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(TransformExecutor.class);
+
public static <T> TransformExecutor<T> create(
EvaluationContext context,
TransformEvaluatorFactory factory,
@@ -112,6 +116,10 @@ class TransformExecutor<T> implements Runnable {
throw (RuntimeException) e;
}
throw new RuntimeException(e);
+ } catch (Error err) {
+ LOG.error("Error occurred within {}", this, err);
+ onComplete.handleError(err);
+ throw err;
} finally {
// Report the physical metrics from the end of this step.
context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative());
@@ -162,8 +170,8 @@ class TransformExecutor<T> implements Runnable {
TransformEvaluator<T> evaluator, MetricsContainer metricsContainer,
Collection<ModelEnforcement<T>> enforcements)
throws Exception {
- TransformResult<T> result = evaluator.finishBundle()
- .withLogicalMetricUpdates(metricsContainer.getCumulative());
+ TransformResult<T> result =
+ evaluator.finishBundle().withLogicalMetricUpdates(metricsContainer.getCumulative());
CommittedResult outputs = onComplete.handleResult(inputBundle, result);
for (ModelEnforcement<T> enforcement : enforcements) {
enforcement.afterFinish(inputBundle, result, outputs.getOutputs());
http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
index 11ecbff..9275e3c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
@@ -28,7 +28,7 @@ import org.joda.time.Instant;
*
* <p>For uses of the {@link Clock} interface in unit tests.
*/
-public class MockClock implements Clock {
+class MockClock implements Clock {
private Instant now;
http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index dc0ef7c..86412a0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -434,6 +434,11 @@ public class TransformExecutorTest {
handledException = e;
onMethod.countDown();
}
+
+ @Override
+ public void handleError(Error err) {
+ throw err;
+ }
}
private static class TestEnforcementFactory implements ModelEnforcementFactory {
[3/3] beam git commit: This closes #2714
Posted by dh...@apache.org.
This closes #2714
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/72b361e9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/72b361e9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/72b361e9
Branch: refs/heads/master
Commit: 72b361e9af1a21bc965408da392a2490e5985f0c
Parents: 28180c4 b0a3261
Author: Dan Halperin <dh...@google.com>
Authored: Tue May 9 11:49:17 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 9 11:49:17 2017 -0700
----------------------------------------------------------------------
runners/direct-java/pom.xml | 79 ++++++++++++++++++--
.../beam/runners/direct/CompletionCallback.java | 6 ++
.../direct/ExecutorServiceParallelExecutor.java | 27 +++++--
.../GroupAlsoByWindowEvaluatorFactory.java | 6 +-
.../beam/runners/direct/TransformExecutor.java | 12 ++-
.../direct/DirectRunnerApiSurfaceTest.java | 55 ++++++++++++++
.../apache/beam/runners/direct/MockClock.java | 2 +-
.../runners/direct/TransformExecutorTest.java | 5 ++
8 files changed, 176 insertions(+), 16 deletions(-)
----------------------------------------------------------------------