You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/28 16:18:35 UTC

[3/7] beam git commit: NonNull by default in sdk/transforms/join

NonNull by default in sdk/transforms/join


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b2edb15
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b2edb15
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b2edb15

Branch: refs/heads/master
Commit: 7b2edb153967ec5d3cfa1e2e9fe538159f953b30
Parents: e25aba8
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 23 19:35:08 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Oct 28 08:42:30 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/join/CoGbkResult.java  |  7 +++++--
 .../beam/sdk/transforms/join/KeyedPCollectionTuple.java   | 10 ++++------
 .../apache/beam/sdk/transforms/join/RawUnionValue.java    |  8 +++++---
 .../org/apache/beam/sdk/transforms/join/package-info.java |  4 ++++
 .../beam/sdk/nexmark/queries/NexmarkQueryModel.java       |  5 ++---
 .../java/org/apache/beam/sdk/nexmark/queries/Query8.java  |  3 ++-
 .../org/apache/beam/sdk/nexmark/queries/WinningBids.java  |  3 ++-
 7 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 877bb07..16a0bae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
@@ -197,7 +198,8 @@ public class CoGbkResult {
    * <p>If tag was not part of the original {@link CoGroupByKey},
    * throws an IllegalArgumentException.
    */
-  public <V> V getOnly(TupleTag<V> tag, V defaultValue) {
+  @Nullable
+  public <V> V getOnly(TupleTag<V> tag, @Nullable V defaultValue) {
     return innerGetOnly(tag, defaultValue, true);
   }
 
@@ -356,9 +358,10 @@ public class CoGbkResult {
     this.valueMap = valueMap;
   }
 
+  @Nullable
   private <V> V innerGetOnly(
       TupleTag<V> tag,
-      V defaultValue,
+      @Nullable V defaultValue,
       boolean useDefault) {
     int index = schema.getIndex(tag);
     if (index < 0) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
index 2e7dd01..a9d1873 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -207,24 +208,21 @@ public class KeyedPCollectionTuple<K> implements PInput {
    */
   private final List<TaggedKeyedPCollection<K, ?>> keyedCollections;
 
-  private Coder<K> keyCoder;
+  @Nullable private Coder<K> keyCoder;
 
   private final CoGbkResultSchema schema;
 
   private final Pipeline pipeline;
 
   KeyedPCollectionTuple(Pipeline pipeline) {
-    this(pipeline,
-         new ArrayList<TaggedKeyedPCollection<K, ?>>(),
-         TupleTagList.empty(),
-         null);
+    this(pipeline, new ArrayList<TaggedKeyedPCollection<K, ?>>(), TupleTagList.empty(), null);
   }
 
   KeyedPCollectionTuple(
       Pipeline pipeline,
       List<TaggedKeyedPCollection<K, ?>> keyedCollections,
       TupleTagList tupleTagList,
-      Coder<K> keyCoder) {
+      @Nullable Coder<K> keyCoder) {
     this.pipeline = pipeline;
     this.keyedCollections = keyedCollections;
     this.schema = new CoGbkResultSchema(tupleTagList);

http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
index 07bfe69..7ac1faf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
@@ -20,18 +20,20 @@ package org.apache.beam.sdk.transforms.join;
 // TODO: Think about making this a complete dynamic union by adding
 // a schema.  Type would then be defined by the corresponding schema entry.
 
+import javax.annotation.Nullable;
+
 /**
  * This corresponds to an integer union tag and value.  The mapping of
  * union tag to type must come from elsewhere.
  */
 public class RawUnionValue {
   private final int unionTag;
-  private final Object value;
+  @Nullable private final Object value;
 
   /**
    * Constructs a partial union from the given union tag and value.
    */
-  public RawUnionValue(int unionTag, Object value) {
+  public RawUnionValue(int unionTag, @Nullable Object value) {
     this.unionTag = unionTag;
     this.value = value;
   }
@@ -40,7 +42,7 @@ public class RawUnionValue {
     return unionTag;
   }
 
-  public Object getValue() {
+  @Nullable public Object getValue() {
     return value;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java
index f4b315e..7aab329 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java
@@ -19,4 +19,8 @@
  * Defines the {@link org.apache.beam.sdk.transforms.join.CoGroupByKey} transform
  * for joining multiple PCollections.
  */
+@DefaultAnnotation(NonNull.class)
 package org.apache.beam.sdk.transforms.join;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;

http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
index 1f093a0..2efab3e 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
@@ -24,13 +24,11 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
 import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.TimestampedValue;
-
-
 import org.hamcrest.core.IsEqual;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -107,6 +105,7 @@ public abstract class NexmarkQueryModel implements Serializable {
 
     return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() {
       @Override
+      @Nullable
       public Void apply(Iterable<TimestampedValue<KnownSize>> actual) {
       Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
         Assert.assertThat("wrong pipeline output", actualStrings,

http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
index fa3dd86..def7cb3 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.nexmark.queries;
 
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
 import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.Auction;
@@ -78,7 +79,7 @@ public class Query8 extends NexmarkQuery {
             ParDo.of(new DoFn<KV<Long, CoGbkResult>, IdNameReserve>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
-                    Person person = c.element().getValue().getOnly(PERSON_TAG, null);
+                    @Nullable Person person = c.element().getValue().getOnly(PERSON_TAG, null);
                     if (person == null) {
                       // Person was not created in last window period.
                       return;

http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
index d73b8ae..bc553c9 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.TreeMap;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
@@ -362,7 +363,7 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
 
             @ProcessElement
             public void processElement(ProcessContext c) {
-              Auction auction =
+              @Nullable Auction auction =
                   c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null);
               if (auction == null) {
                 // We have bids without a matching auction. Give up.