You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/10 01:15:16 UTC

[05/11] beam git commit: NonNull by default in sdk/values

NonNull by default in sdk/values


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/336b71fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/336b71fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/336b71fa

Branch: refs/heads/master
Commit: 336b71fac4667487816ee5a913d51d57e6f9c026
Parents: 80ae930
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 23 19:45:31 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Thu Nov 9 15:01:55 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/values/BeamRecord.java  |  6 +++---
 .../java/org/apache/beam/sdk/values/KV.java     | 11 +++++-----
 .../org/apache/beam/sdk/values/PCollection.java | 12 ++++++-----
 .../beam/sdk/values/PCollectionViews.java       | 10 +--------
 .../org/apache/beam/sdk/values/PValueBase.java  | 22 +++++++++++++-------
 .../beam/sdk/values/TimestampedValue.java       |  9 ++++----
 .../apache/beam/sdk/values/package-info.java    |  4 ++++
 7 files changed, 40 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/336b71fa/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
index 999f27a..c79d1f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.BeamRecordCoder;
 
@@ -177,9 +178,8 @@ public class BeamRecord implements Serializable {
     return (Boolean) getFieldValue(fieldName);
   }
 
-  /**
-   * Get value by field index.
-   */
+  /** Get value by field index. */
+  @Nullable
   public Object getFieldValue(int fieldIdx) {
     return dataValues.get(fieldIdx);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/336b71fa/sdks/java/core/src/main/java/org/apache/beam/sdk/values/KV.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/KV.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/KV.java
index dec9a16..030d9f2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/KV.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/KV.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Objects;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -38,12 +39,12 @@ import org.apache.beam.sdk.transforms.SerializableComparator;
  */
 public class KV<K, V> implements Serializable {
   /** Returns a {@link KV} with the given key and value. */
-  public static <K, V> KV<K, V> of(K key, V value) {
+  public static <K, V> KV<K, V> of(@Nullable K key, @Nullable V value) {
     return new KV<>(key, value);
   }
 
   /** Returns the key of this {@link KV}. */
-  public K getKey() {
+  public @Nullable K getKey() {
     return key;
   }
 
@@ -55,10 +56,10 @@ public class KV<K, V> implements Serializable {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  final K key;
-  final V value;
+  final @Nullable K key;
+  final @Nullable V value;
 
-  private KV(K key, V value) {
+  private KV(@Nullable K key, @Nullable V value) {
     this.key = key;
     this.value = value;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/336b71fa/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index e8bf9b8..0806cf8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -80,7 +80,8 @@ public class PCollection<T> extends PValueBase implements PValue {
    */
   private CoderOrFailure<T> coderOrFailure =
       new CoderOrFailure<>(null, "No Coder was specified, and Coder Inference did not occur");
-  private TypeDescriptor<T> typeDescriptor;
+
+  @Nullable private TypeDescriptor<T> typeDescriptor;
 
   @Override
   public void finishSpecifyingOutput(
@@ -113,6 +114,7 @@ public class PCollection<T> extends PValueBase implements PValue {
    * is available. Subclasses may override this to enable better
    * {@code Coder} inference.
    */
+  @Nullable
   public TypeDescriptor<T> getTypeDescriptor() {
     return typeDescriptor;
   }
@@ -326,8 +328,10 @@ public class PCollection<T> extends PValueBase implements PValue {
    */
   private final TupleTag<?> tag = new TupleTag<>();
 
-  private PCollection(Pipeline p) {
+  private PCollection(Pipeline p, WindowingStrategy<?, ?> windowingStrategy, IsBounded isBounded) {
     super(p);
+    this.windowingStrategy = windowingStrategy;
+    this.isBounded = isBounded;
   }
 
   /**
@@ -368,9 +372,7 @@ public class PCollection<T> extends PValueBase implements PValue {
       WindowingStrategy<?, ?> windowingStrategy,
       IsBounded isBounded,
       @Nullable Coder<T> coder) {
-    PCollection<T> res = new PCollection<T>(pipeline)
-        .setWindowingStrategyInternal(windowingStrategy)
-        .setIsBoundedInternal(isBounded);
+    PCollection<T> res = new PCollection<T>(pipeline, windowingStrategy, isBounded);
     if (coder != null) {
       res.setCoder(coder);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/336b71fa/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index f2a3097..ed8fb76 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -390,7 +390,7 @@ public class PCollectionViews {
     private WindowingStrategy<?, W> windowingStrategy;
 
     /** The coder for the elements underlying the view. */
-    private Coder<Iterable<WindowedValue<ElemT>>> coder;
+    private @Nullable Coder<Iterable<WindowedValue<ElemT>>> coder;
 
     /**
      * The typed {@link ViewFn} for this view.
@@ -441,14 +441,6 @@ public class PCollectionViews {
           valueCoder);
     }
 
-    /**
-     * For serialization only. Do not use directly.
-     */
-    @SuppressWarnings("unused")  // used for serialization
-    protected SimplePCollectionView() {
-      super();
-    }
-
     @Override
     public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() {
       // Safe cast: it is required that the rest of the SDK maintain the invariant

http://git-wip-us.apache.org/repos/asf/beam/blob/336b71fa/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
index f312eac..1428419 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.values;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -33,7 +34,7 @@ import org.apache.beam.sdk.util.NameUtils;
 @Internal
 public abstract class PValueBase implements PValue {
 
-  private final Pipeline pipeline;
+  private final transient @Nullable Pipeline pipeline;
 
   /**
    * Returns the name of this {@link PValueBase}.
@@ -71,18 +72,16 @@ public abstract class PValueBase implements PValue {
   }
 
   /**
-   * No-arg constructor for Java serialization only.
-   * The resulting {@link PValueBase} is unlikely to be
-   * valid.
+   * No-arg constructor to allow subclasses to implement {@link java.io.Serializable}.
+   * The resulting {@link PValueBase} is not valid as a {@link PValue}, but may have other
+   * properties that are still usable, such as the tag in a {@link PCollectionView}.
    */
   protected PValueBase() {
     this.pipeline = null;
   }
 
-  /**
-   * The name of this {@link PValueBase}, or {@code null} if not yet set.
-   */
-  private String name;
+  /** The name of this {@link PValueBase}, or {@code null} if not yet set. */
+  @Nullable private String name;
 
   /**
    * Whether this {@link PValueBase} has been finalized, and its core
@@ -123,6 +122,13 @@ public abstract class PValueBase implements PValue {
 
   @Override
   public Pipeline getPipeline() {
+    checkState(
+        pipeline != null,
+        "Pipeline was null for %s. "
+            + "this probably means it was used as a %s after being deserialized, "
+            + "which not unsupported.",
+        getClass().getCanonicalName(),
+        PValue.class.getSimpleName());
     return pipeline;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/336b71fa/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index a4c8b3f..9a8abba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StructuredCoder;
@@ -45,14 +46,14 @@ public class TimestampedValue<V> {
    * Returns a new {@link TimestampedValue} with the
    * {@link BoundedWindow#TIMESTAMP_MIN_VALUE minimum timestamp}.
    */
-  public static <V> TimestampedValue<V> atMinimumTimestamp(V value) {
+  public static <V> TimestampedValue<V> atMinimumTimestamp(@Nullable V value) {
     return of(value, BoundedWindow.TIMESTAMP_MIN_VALUE);
   }
 
   /**
    * Returns a new {@code TimestampedValue} with the given value and timestamp.
    */
-  public static <V> TimestampedValue<V> of(V value, Instant timestamp) {
+  public static <V> TimestampedValue<V> of(@Nullable V value, Instant timestamp) {
     return new TimestampedValue<>(value, timestamp);
   }
 
@@ -145,10 +146,10 @@ public class TimestampedValue<V> {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  private final V value;
+  private final @Nullable V value;
   private final Instant timestamp;
 
-  protected TimestampedValue(V value, Instant timestamp) {
+  protected TimestampedValue(@Nullable V value, Instant timestamp) {
     checkNotNull(timestamp, "timestamp must be non-null");
 
     this.value = value;

http://git-wip-us.apache.org/repos/asf/beam/blob/336b71fa/sdks/java/core/src/main/java/org/apache/beam/sdk/values/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/package-info.java
index c028407..7b397cd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/package-info.java
@@ -50,4 +50,8 @@
  *
  * <p>For further details, see the documentation for each class in this package.
  */
+@DefaultAnnotation(NonNull.class)
 package org.apache.beam.sdk.values;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;