You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/10 17:52:30 UTC
[1/3] beam git commit: This closes #3052
Repository: beam
Updated Branches:
refs/heads/release-2.0.0 239438e4b -> 106b17987
This closes #3052
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/106b1798
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/106b1798
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/106b1798
Branch: refs/heads/release-2.0.0
Commit: 106b179870a11c93a214c08901421d244d3b59dd
Parents: 239438e c14d34e
Author: Thomas Groh <tg...@google.com>
Authored: Wed May 10 10:52:20 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 10 10:52:20 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectOptions.java | 2 +-
.../beam/runners/direct/DirectRegistrar.java | 10 +++---
.../beam/runners/direct/DirectRunner.java | 33 +++++++++++++-------
.../org/apache/beam/sdk/coders/AtomicCoder.java | 17 ++++++++--
.../java/org/apache/beam/sdk/coders/Coder.java | 22 +++++--------
.../org/apache/beam/sdk/coders/CustomCoder.java | 17 +++-------
.../apache/beam/sdk/coders/StructuredCoder.java | 10 +++---
7 files changed, 62 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
[3/3] beam git commit: Improve DirectRunner Javadoc
Posted by tg...@apache.org.
Improve DirectRunner Javadoc
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2e66405a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2e66405a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2e66405a
Branch: refs/heads/release-2.0.0
Commit: 2e66405aa0da6d951dfbd023ecb3d59336bb4b49
Parents: 239438e
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 9 18:04:01 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 10 10:52:20 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectOptions.java | 2 +-
.../beam/runners/direct/DirectRegistrar.java | 10 +++---
.../beam/runners/direct/DirectRunner.java | 33 +++++++++++++-------
3 files changed, 28 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2e66405a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
index 3b66cc6..574ab46 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -49,7 +49,7 @@ public interface DirectOptions extends PipelineOptions, ApplicationNameOptions {
@Default.Boolean(true)
@Description(
"Controls whether the DirectRunner should ensure that all of the elements of every "
- + "PCollection are encodable. All elements in a PCollection must be encodable.")
+ + "PCollection can be encoded and decoded by that PCollection's Coder.")
boolean isEnforceEncodability();
void setEnforceEncodability(boolean test);
http://git-wip-us.apache.org/repos/asf/beam/blob/2e66405a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
index 3e69e2b..0e6fbab 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
@@ -26,31 +26,31 @@ import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
/**
* Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
- * {@link org.apache.beam.runners.direct.DirectRunner}.
+ * {@link DirectRunner}.
*/
public class DirectRegistrar {
private DirectRegistrar() {}
/**
- * Registers the {@link org.apache.beam.runners.direct.DirectRunner}.
+ * Registers the {@link DirectRunner}.
*/
@AutoService(PipelineRunnerRegistrar.class)
public static class Runner implements PipelineRunnerRegistrar {
@Override
public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
- org.apache.beam.runners.direct.DirectRunner.class);
+ DirectRunner.class);
}
}
/**
- * Registers the {@link org.apache.beam.runners.direct.DirectOptions}.
+ * Registers the {@link DirectOptions}.
*/
@AutoService(PipelineOptionsRegistrar.class)
public static class Options implements PipelineOptionsRegistrar {
@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
return ImmutableList.<Class<? extends PipelineOptions>>of(
- org.apache.beam.runners.direct.DirectOptions.class);
+ DirectOptions.class);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2e66405a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index b0ce5eb..181896f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -51,8 +51,14 @@ import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
/**
- * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
- * {@link PCollection PCollections}.
+ * A {@link PipelineRunner} that executes a {@link Pipeline} within the process that constructed the
+ * {@link Pipeline}.
+ *
+ * <p>The {@link DirectRunner} is suitable for running a {@link Pipeline} on small scale, example,
+ * and test data, and should be used for ensuring that processing logic is correct. It also
+ * is appropriate for executing unit tests and performs additional work to ensure that behavior
+ * contained within a {@link Pipeline} does not break assumptions within the Beam model, to improve
+ * the ability to execute a {@link Pipeline} at scale on a distributed backend.
*/
public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
@@ -127,6 +133,9 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
private final Set<Enforcement> enabledEnforcements;
private Supplier<Clock> clockSupplier = new NanosOffsetClockSupplier();
+ /**
+ * Construct a {@link DirectRunner} from the provided options.
+ */
public static DirectRunner fromOptions(PipelineOptions options) {
return new DirectRunner(options.as(DirectOptions.class));
}
@@ -246,8 +255,6 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
/**
* The result of running a {@link Pipeline} with the {@link DirectRunner}.
- *
- * <p>Throws {@link UnsupportedOperationException} for all methods.
*/
public static class DirectPipelineResult implements PipelineResult {
private final PipelineExecutor executor;
@@ -274,14 +281,11 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
}
/**
- * Blocks until the {@link Pipeline} execution represented by this
- * {@link DirectPipelineResult} is complete, returning the terminal state.
+ * {@inheritDoc}.
*
- * <p>If the pipeline terminates abnormally by throwing an exception, this will rethrow the
- * exception. Future calls to {@link #getState()} will return
- * {@link org.apache.beam.sdk.PipelineResult.State#FAILED}.
- *
- * <p>See also {@link PipelineExecutor#waitUntilFinish(Duration)}.
+ * <p>If the pipeline terminates abnormally by throwing an {@link Exception}, this will rethrow
+ * the original {@link Exception}. Future calls to {@link #getState()} will return {@link
+ * org.apache.beam.sdk.PipelineResult.State#FAILED}.
*/
@Override
public State waitUntilFinish() {
@@ -298,6 +302,13 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
return executor.getPipelineState();
}
+ /**
+ * {@inheritDoc}.
+ *
+ * <p>If the pipeline terminates abnormally by throwing an {@link Exception}, this will rethrow
+ * the original {@link Exception}. Future calls to {@link #getState()} will return {@link
+ * org.apache.beam.sdk.PipelineResult.State#FAILED}.
+ */
@Override
public State waitUntilFinish(Duration duration) {
State startState = this.state;
[2/3] beam git commit: Update Coder Documentation
Posted by tg...@apache.org.
Update Coder Documentation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c14d34e4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c14d34e4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c14d34e4
Branch: refs/heads/release-2.0.0
Commit: c14d34e42eb38b939a8f02f89935bc97a3a9c102
Parents: 2e66405
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 9 18:09:12 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 10 10:52:20 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/coders/AtomicCoder.java | 17 +++++++++++++--
.../java/org/apache/beam/sdk/coders/Coder.java | 22 +++++++-------------
.../org/apache/beam/sdk/coders/CustomCoder.java | 17 +++++----------
.../apache/beam/sdk/coders/StructuredCoder.java | 10 +++++----
4 files changed, 34 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c14d34e4/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
index 7bcd532..b244ed5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
@@ -35,14 +35,22 @@ public abstract class AtomicCoder<T> extends StructuredCoder<T> {
/**
* {@inheritDoc}.
*
- * @throws NonDeterministicException
+ * <p>Unless overridden, does not throw. An {@link AtomicCoder} is presumed to be deterministic
+ *
+ * @throws NonDeterministicException if overridden to indicate that this sublcass of
+ * {@link AtomicCoder} is not deterministic
*/
@Override
public void verifyDeterministic() throws NonDeterministicException {}
+ /**
+ * {@inheritDoc}.
+ *
+ * @return the empty list
+ */
@Override
public List<? extends Coder<?>> getCoderArguments() {
- return null;
+ return Collections.emptyList();
}
/**
@@ -65,6 +73,11 @@ public abstract class AtomicCoder<T> extends StructuredCoder<T> {
return other != null && this.getClass().equals(other.getClass());
}
+ /**
+ * {@inheritDoc}.
+ *
+ * @return the {@link #hashCode()} of the {@link Class} of this {@link AtomicCoder}.
+ */
@Override
public final int hashCode() {
return this.getClass().hashCode();
http://git-wip-us.apache.org/repos/asf/beam/blob/c14d34e4/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
index 2ee532d..edcc3a8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
@@ -43,23 +43,17 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* byte streams.
*
* <p>{@link Coder} instances are serialized during job creation and deserialized
- * before use, via JSON serialization. See {@link SerializableCoder} for an example of a
- * {@link Coder} that adds a custom field to
- * the {@link Coder} serialization. It provides a constructor annotated with
- * {@link com.fasterxml.jackson.annotation.JsonCreator}, which is a factory method used when
- * deserializing a {@link Coder} instance.
+ * before use. This will generally be performed by serializing the object via Java Serialization.
*
* <p>{@link Coder} classes for compound types are often composed from coder classes for types
* contains therein. The composition of {@link Coder} instances into a coder for the compound
* class is the subject of the {@link CoderProvider} type, which enables automatic generic
- * composition of {@link Coder} classes within the {@link CoderRegistry}. With particular
- * static methods on a compound {@link Coder} class, a {@link CoderProvider} can be automatically
- * inferred. See {@link KvCoder} for an example of a simple compound {@link Coder} that supports
- * automatic composition in the {@link CoderRegistry}.
+ * composition of {@link Coder} classes within the {@link CoderRegistry}. See {@link CoderProvider}
+ * and {@link CoderRegistry} for more information about how coders are inferred.
*
* <p>All methods of a {@link Coder} are required to be thread safe.
*
- * @param <T> the type of the values being transcoded
+ * @param <T> the type of values being encoded and decoded
*/
public abstract class Coder<T> implements Serializable {
/** The context in which encoding or decoding is being done. */
@@ -167,10 +161,10 @@ public abstract class Coder<T> implements Serializable {
}
/**
- * If this is a {@code Coder} for a parameterized type, returns the
- * list of {@code Coder}s being used for each of the parameters, or
- * returns {@code null} if this cannot be done or this is not a
- * parameterized type.
+ * If this is a {@link Coder} for a parameterized type, returns the
+ * list of {@link Coder}s being used for each of the parameters in the same order they appear
+ * within the parameterized type's type signature. If this cannot be done, or this
+ * {@link Coder} does not encode/decode a parameterized type, returns the empty list.
*/
public abstract List<? extends Coder<?>> getCoderArguments();
http://git-wip-us.apache.org/repos/asf/beam/blob/c14d34e4/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
index c581923..e8ce5b1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
@@ -22,19 +22,12 @@ import java.util.Collections;
import java.util.List;
/**
- * An abstract base class for writing a {@link Coder} class that encodes itself via Java
- * serialization.
+ * An abstract base class that implements all methods of {@link Coder} except {@link Coder#encode}
+ * and {@link Coder#decode}.
*
- * <p>To complete an implementation, subclasses must implement {@link Coder#encode}
- * and {@link Coder#decode} methods.
- *
- * <p>Not to be confused with {@link SerializableCoder} that encodes objects that implement the
- * {@link Serializable} interface.
- *
- * @param <T> the type of elements handled by this coder
+ * @param <T> the type of values being encoded and decoded
*/
-public abstract class CustomCoder<T> extends Coder<T>
- implements Serializable {
+public abstract class CustomCoder<T> extends Coder<T> implements Serializable {
/**
* {@inheritDoc}.
@@ -61,5 +54,5 @@ public abstract class CustomCoder<T> extends Coder<T>
// This coder inherits isRegisterByteSizeObserverCheap,
// getEncodedElementByteSize and registerByteSizeObserver
- // from StructuredCoder. Override if we can do better.
+ // from Coder. Override if we can do better.
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c14d34e4/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
index db5900b..2eb662b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
@@ -32,11 +32,11 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* <p>To extend {@link StructuredCoder}, override the following methods as appropriate:
*
* <ul>
- * <li>{@link #getComponents}: the default implementation returns {@link #getCoderArguments}.</li>
- * <li>{@link #getEncodedElementByteSize} and
- * {@link #isRegisterByteSizeObserverCheap}: the
+ * <li>{@link #getComponents}: the default implementation returns {@link #getCoderArguments}.
+ * <li>{@link #getEncodedElementByteSize} and {@link #isRegisterByteSizeObserverCheap}: the
* default implementation encodes values to bytes and counts the bytes, which is considered
- * expensive.</li>
+ * expensive. The default element byte size observer uses the value returned by
+ * {@link #getEncodedElementByteSize}.
* </ul>
*/
public abstract class StructuredCoder<T> extends Coder<T> {
@@ -44,6 +44,8 @@ public abstract class StructuredCoder<T> extends Coder<T> {
/**
* Returns the list of {@link Coder Coders} that are components of this {@link Coder}.
+ *
+ * <p>The default components will be equal to the value returned by {@link #getCoderArguments()}.
*/
public List<? extends Coder<?>> getComponents() {
List<? extends Coder<?>> coderArguments = getCoderArguments();