You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/10 17:26:38 UTC
[3/3] beam git commit: Improve DirectRunner Javadoc
Improve DirectRunner Javadoc
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b159564
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b159564
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b159564
Branch: refs/heads/master
Commit: 0b1595640686b5e9e6314a4868b923535d253c37
Parents: 4abb9cb
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 9 18:04:01 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 10 10:26:24 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectOptions.java | 2 +-
.../beam/runners/direct/DirectRegistrar.java | 10 +++---
.../beam/runners/direct/DirectRunner.java | 33 +++++++++++++-------
3 files changed, 28 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0b159564/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
index 3b66cc6..574ab46 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -49,7 +49,7 @@ public interface DirectOptions extends PipelineOptions, ApplicationNameOptions {
@Default.Boolean(true)
@Description(
"Controls whether the DirectRunner should ensure that all of the elements of every "
- + "PCollection are encodable. All elements in a PCollection must be encodable.")
+ + "PCollection can be encoded and decoded by that PCollection's Coder.")
boolean isEnforceEncodability();
void setEnforceEncodability(boolean test);
http://git-wip-us.apache.org/repos/asf/beam/blob/0b159564/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
index 3e69e2b..0e6fbab 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
@@ -26,31 +26,31 @@ import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
/**
* Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
- * {@link org.apache.beam.runners.direct.DirectRunner}.
+ * {@link DirectRunner}.
*/
public class DirectRegistrar {
private DirectRegistrar() {}
/**
- * Registers the {@link org.apache.beam.runners.direct.DirectRunner}.
+ * Registers the {@link DirectRunner}.
*/
@AutoService(PipelineRunnerRegistrar.class)
public static class Runner implements PipelineRunnerRegistrar {
@Override
public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
- org.apache.beam.runners.direct.DirectRunner.class);
+ DirectRunner.class);
}
}
/**
- * Registers the {@link org.apache.beam.runners.direct.DirectOptions}.
+ * Registers the {@link DirectOptions}.
*/
@AutoService(PipelineOptionsRegistrar.class)
public static class Options implements PipelineOptionsRegistrar {
@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
return ImmutableList.<Class<? extends PipelineOptions>>of(
- org.apache.beam.runners.direct.DirectOptions.class);
+ DirectOptions.class);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b159564/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index b0ce5eb..181896f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -51,8 +51,14 @@ import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
/**
- * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
- * {@link PCollection PCollections}.
+ * A {@link PipelineRunner} that executes a {@link Pipeline} within the process that constructed the
+ * {@link Pipeline}.
+ *
+ * <p>The {@link DirectRunner} is suitable for running a {@link Pipeline} on small scale, example,
+ * and test data, and should be used for ensuring that processing logic is correct. It also
+ * is appropriate for executing unit tests and performs additional work to ensure that behavior
+ * contained within a {@link Pipeline} does not break assumptions within the Beam model, to improve
+ * the ability to execute a {@link Pipeline} at scale on a distributed backend.
*/
public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
@@ -127,6 +133,9 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
private final Set<Enforcement> enabledEnforcements;
private Supplier<Clock> clockSupplier = new NanosOffsetClockSupplier();
+ /**
+ * Construct a {@link DirectRunner} from the provided options.
+ */
public static DirectRunner fromOptions(PipelineOptions options) {
return new DirectRunner(options.as(DirectOptions.class));
}
@@ -246,8 +255,6 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
/**
* The result of running a {@link Pipeline} with the {@link DirectRunner}.
- *
- * <p>Throws {@link UnsupportedOperationException} for all methods.
*/
public static class DirectPipelineResult implements PipelineResult {
private final PipelineExecutor executor;
@@ -274,14 +281,11 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
}
/**
- * Blocks until the {@link Pipeline} execution represented by this
- * {@link DirectPipelineResult} is complete, returning the terminal state.
+ * {@inheritDoc}.
*
- * <p>If the pipeline terminates abnormally by throwing an exception, this will rethrow the
- * exception. Future calls to {@link #getState()} will return
- * {@link org.apache.beam.sdk.PipelineResult.State#FAILED}.
- *
- * <p>See also {@link PipelineExecutor#waitUntilFinish(Duration)}.
+ * <p>If the pipeline terminates abnormally by throwing an {@link Exception}, this will rethrow
+ * the original {@link Exception}. Future calls to {@link #getState()} will return {@link
+ * org.apache.beam.sdk.PipelineResult.State#FAILED}.
*/
@Override
public State waitUntilFinish() {
@@ -298,6 +302,13 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
return executor.getPipelineState();
}
+ /**
+ * {@inheritDoc}.
+ *
+ * <p>If the pipeline terminates abnormally by throwing an {@link Exception}, this will rethrow
+ * the original {@link Exception}. Future calls to {@link #getState()} will return {@link
+ * org.apache.beam.sdk.PipelineResult.State#FAILED}.
+ */
@Override
public State waitUntilFinish(Duration duration) {
State startState = this.state;