You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/10 17:52:30 UTC

[1/3] beam git commit: This closes #3052

Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 239438e4b -> 106b17987


This closes #3052


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/106b1798
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/106b1798
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/106b1798

Branch: refs/heads/release-2.0.0
Commit: 106b179870a11c93a214c08901421d244d3b59dd
Parents: 239438e c14d34e
Author: Thomas Groh <tg...@google.com>
Authored: Wed May 10 10:52:20 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 10 10:52:20 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectOptions.java      |  2 +-
 .../beam/runners/direct/DirectRegistrar.java    | 10 +++---
 .../beam/runners/direct/DirectRunner.java       | 33 +++++++++++++-------
 .../org/apache/beam/sdk/coders/AtomicCoder.java | 17 ++++++++--
 .../java/org/apache/beam/sdk/coders/Coder.java  | 22 +++++--------
 .../org/apache/beam/sdk/coders/CustomCoder.java | 17 +++-------
 .../apache/beam/sdk/coders/StructuredCoder.java | 10 +++---
 7 files changed, 62 insertions(+), 49 deletions(-)
----------------------------------------------------------------------



[3/3] beam git commit: Improve DirectRunner Javadoc

Posted by tg...@apache.org.
Improve DirectRunner Javadoc


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2e66405a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2e66405a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2e66405a

Branch: refs/heads/release-2.0.0
Commit: 2e66405aa0da6d951dfbd023ecb3d59336bb4b49
Parents: 239438e
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 9 18:04:01 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 10 10:52:20 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectOptions.java      |  2 +-
 .../beam/runners/direct/DirectRegistrar.java    | 10 +++---
 .../beam/runners/direct/DirectRunner.java       | 33 +++++++++++++-------
 3 files changed, 28 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2e66405a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
index 3b66cc6..574ab46 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -49,7 +49,7 @@ public interface DirectOptions extends PipelineOptions, ApplicationNameOptions {
   @Default.Boolean(true)
   @Description(
       "Controls whether the DirectRunner should ensure that all of the elements of every "
-          + "PCollection are encodable. All elements in a PCollection must be encodable.")
+          + "PCollection can be encoded and decoded by that PCollection's Coder.")
   boolean isEnforceEncodability();
   void setEnforceEncodability(boolean test);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2e66405a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
index 3e69e2b..0e6fbab 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
@@ -26,31 +26,31 @@ import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
 
 /**
  * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
- * {@link org.apache.beam.runners.direct.DirectRunner}.
+ * {@link DirectRunner}.
  */
 public class DirectRegistrar {
   private DirectRegistrar() {}
   /**
-   * Registers the {@link org.apache.beam.runners.direct.DirectRunner}.
+   * Registers the {@link DirectRunner}.
    */
   @AutoService(PipelineRunnerRegistrar.class)
   public static class Runner implements PipelineRunnerRegistrar {
     @Override
     public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
       return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
-          org.apache.beam.runners.direct.DirectRunner.class);
+          DirectRunner.class);
     }
   }
 
   /**
-   * Registers the {@link org.apache.beam.runners.direct.DirectOptions}.
+   * Registers the {@link DirectOptions}.
    */
   @AutoService(PipelineOptionsRegistrar.class)
   public static class Options implements PipelineOptionsRegistrar {
     @Override
     public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
       return ImmutableList.<Class<? extends PipelineOptions>>of(
-          org.apache.beam.runners.direct.DirectOptions.class);
+          DirectOptions.class);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/2e66405a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index b0ce5eb..181896f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -51,8 +51,14 @@ import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 
 /**
- * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
- * {@link PCollection PCollections}.
+ * A {@link PipelineRunner} that executes a {@link Pipeline} within the process that constructed the
+ * {@link Pipeline}.
+ *
+ * <p>The {@link DirectRunner} is suitable for running a {@link Pipeline} on small scale, example,
+ * and test data, and should be used for ensuring that processing logic is correct. It also
+ * is appropriate for executing unit tests and performs additional work to ensure that behavior
+ * contained within a {@link Pipeline} does not break assumptions within the Beam model, to improve
+ * the ability to execute a {@link Pipeline} at scale on a distributed backend.
  */
 public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
 
@@ -127,6 +133,9 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
   private final Set<Enforcement> enabledEnforcements;
   private Supplier<Clock> clockSupplier = new NanosOffsetClockSupplier();
 
+  /**
+   * Construct a {@link DirectRunner} from the provided options.
+   */
   public static DirectRunner fromOptions(PipelineOptions options) {
     return new DirectRunner(options.as(DirectOptions.class));
   }
@@ -246,8 +255,6 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
 
   /**
    * The result of running a {@link Pipeline} with the {@link DirectRunner}.
-   *
-   * <p>Throws {@link UnsupportedOperationException} for all methods.
    */
   public static class DirectPipelineResult implements PipelineResult {
     private final PipelineExecutor executor;
@@ -274,14 +281,11 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
     }
 
     /**
-     * Blocks until the {@link Pipeline} execution represented by this
-     * {@link DirectPipelineResult} is complete, returning the terminal state.
+     * {@inheritDoc}.
      *
-     * <p>If the pipeline terminates abnormally by throwing an exception, this will rethrow the
-     * exception. Future calls to {@link #getState()} will return
-     * {@link org.apache.beam.sdk.PipelineResult.State#FAILED}.
-     *
-     * <p>See also {@link PipelineExecutor#waitUntilFinish(Duration)}.
+     * <p>If the pipeline terminates abnormally by throwing an {@link Exception}, this will rethrow
+     * the original {@link Exception}. Future calls to {@link #getState()} will return {@link
+     * org.apache.beam.sdk.PipelineResult.State#FAILED}.
      */
     @Override
     public State waitUntilFinish() {
@@ -298,6 +302,13 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
       return executor.getPipelineState();
     }
 
+    /**
+     * {@inheritDoc}.
+     *
+     * <p>If the pipeline terminates abnormally by throwing an {@link Exception}, this will rethrow
+     * the original {@link Exception}. Future calls to {@link #getState()} will return {@link
+     * org.apache.beam.sdk.PipelineResult.State#FAILED}.
+     */
     @Override
     public State waitUntilFinish(Duration duration) {
       State startState = this.state;


[2/3] beam git commit: Update Coder Documentation

Posted by tg...@apache.org.
Update Coder Documentation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c14d34e4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c14d34e4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c14d34e4

Branch: refs/heads/release-2.0.0
Commit: c14d34e42eb38b939a8f02f89935bc97a3a9c102
Parents: 2e66405
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 9 18:09:12 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 10 10:52:20 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/coders/AtomicCoder.java | 17 +++++++++++++--
 .../java/org/apache/beam/sdk/coders/Coder.java  | 22 +++++++-------------
 .../org/apache/beam/sdk/coders/CustomCoder.java | 17 +++++----------
 .../apache/beam/sdk/coders/StructuredCoder.java | 10 +++++----
 4 files changed, 34 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c14d34e4/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
index 7bcd532..b244ed5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
@@ -35,14 +35,22 @@ public abstract class AtomicCoder<T> extends StructuredCoder<T> {
   /**
    * {@inheritDoc}.
    *
-   * @throws NonDeterministicException
+   * <p>Unless overridden, does not throw. An {@link AtomicCoder} is presumed to be deterministic
+   *
+   * @throws NonDeterministicException if overridden to indicate that this sublcass of
+   *         {@link AtomicCoder} is not deterministic
    */
   @Override
   public void verifyDeterministic() throws NonDeterministicException {}
 
+  /**
+   * {@inheritDoc}.
+   *
+   * @return the empty list
+   */
   @Override
   public List<? extends Coder<?>> getCoderArguments() {
-    return null;
+    return Collections.emptyList();
   }
 
   /**
@@ -65,6 +73,11 @@ public abstract class AtomicCoder<T> extends StructuredCoder<T> {
     return other != null && this.getClass().equals(other.getClass());
   }
 
+  /**
+   * {@inheritDoc}.
+   *
+   * @return the {@link #hashCode()} of the {@link Class} of this {@link AtomicCoder}.
+   */
   @Override
   public final int hashCode() {
     return this.getClass().hashCode();

http://git-wip-us.apache.org/repos/asf/beam/blob/c14d34e4/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
index 2ee532d..edcc3a8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
@@ -43,23 +43,17 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * byte streams.
  *
  * <p>{@link Coder} instances are serialized during job creation and deserialized
- * before use, via JSON serialization. See {@link SerializableCoder} for an example of a
- * {@link Coder} that adds a custom field to
- * the {@link Coder} serialization. It provides a constructor annotated with
- * {@link com.fasterxml.jackson.annotation.JsonCreator}, which is a factory method used when
- * deserializing a {@link Coder} instance.
+ * before use. This will generally be performed by serializing the object via Java Serialization.
  *
  * <p>{@link Coder} classes for compound types are often composed from coder classes for types
  * contains therein. The composition of {@link Coder} instances into a coder for the compound
  * class is the subject of the {@link CoderProvider} type, which enables automatic generic
- * composition of {@link Coder} classes within the {@link CoderRegistry}. With particular
- * static methods on a compound {@link Coder} class, a {@link CoderProvider} can be automatically
- * inferred. See {@link KvCoder} for an example of a simple compound {@link Coder} that supports
- * automatic composition in the {@link CoderRegistry}.
+ * composition of {@link Coder} classes within the {@link CoderRegistry}. See {@link CoderProvider}
+ * and {@link CoderRegistry} for more information about how coders are inferred.
  *
  * <p>All methods of a {@link Coder} are required to be thread safe.
  *
- * @param <T> the type of the values being transcoded
+ * @param <T> the type of values being encoded and decoded
  */
 public abstract class Coder<T> implements Serializable {
   /** The context in which encoding or decoding is being done. */
@@ -167,10 +161,10 @@ public abstract class Coder<T> implements Serializable {
   }
 
   /**
-   * If this is a {@code Coder} for a parameterized type, returns the
-   * list of {@code Coder}s being used for each of the parameters, or
-   * returns {@code null} if this cannot be done or this is not a
-   * parameterized type.
+   * If this is a {@link Coder} for a parameterized type, returns the
+   * list of {@link Coder}s being used for each of the parameters in the same order they appear
+   * within the parameterized type's type signature. If this cannot be done, or this
+   * {@link Coder} does not encode/decode a parameterized type, returns the empty list.
    */
   public abstract List<? extends Coder<?>> getCoderArguments();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c14d34e4/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
index c581923..e8ce5b1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
@@ -22,19 +22,12 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * An abstract base class for writing a {@link Coder} class that encodes itself via Java
- * serialization.
+ * An abstract base class that implements all methods of {@link Coder} except {@link Coder#encode}
+ * and {@link Coder#decode}.
  *
- * <p>To complete an implementation, subclasses must implement {@link Coder#encode}
- * and {@link Coder#decode} methods.
- *
- * <p>Not to be confused with {@link SerializableCoder} that encodes objects that implement the
- * {@link Serializable} interface.
- *
- * @param <T> the type of elements handled by this coder
+ * @param <T> the type of values being encoded and decoded
  */
-public abstract class CustomCoder<T> extends Coder<T>
-    implements Serializable {
+public abstract class CustomCoder<T> extends Coder<T> implements Serializable {
 
   /**
    * {@inheritDoc}.
@@ -61,5 +54,5 @@ public abstract class CustomCoder<T> extends Coder<T>
 
   // This coder inherits isRegisterByteSizeObserverCheap,
   // getEncodedElementByteSize and registerByteSizeObserver
-  // from StructuredCoder. Override if we can do better.
+  // from Coder. Override if we can do better.
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c14d34e4/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
index db5900b..2eb662b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
@@ -32,11 +32,11 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * <p>To extend {@link StructuredCoder}, override the following methods as appropriate:
  *
  * <ul>
- *   <li>{@link #getComponents}: the default implementation returns {@link #getCoderArguments}.</li>
- *   <li>{@link #getEncodedElementByteSize} and
- *       {@link #isRegisterByteSizeObserverCheap}: the
+ *   <li>{@link #getComponents}: the default implementation returns {@link #getCoderArguments}.
+ *   <li>{@link #getEncodedElementByteSize} and {@link #isRegisterByteSizeObserverCheap}: the
  *       default implementation encodes values to bytes and counts the bytes, which is considered
- *       expensive.</li>
+ *       expensive. The default element byte size observer uses the value returned by
+ *       {@link #getEncodedElementByteSize}.
  * </ul>
  */
 public abstract class StructuredCoder<T> extends Coder<T> {
@@ -44,6 +44,8 @@ public abstract class StructuredCoder<T> extends Coder<T> {
 
   /**
    * Returns the list of {@link Coder Coders} that are components of this {@link Coder}.
+   *
+   * <p>The default components will be equal to the value returned by {@link #getCoderArguments()}.
    */
   public List<? extends Coder<?>> getComponents() {
     List<? extends Coder<?>> coderArguments = getCoderArguments();