You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/28 16:18:35 UTC
[3/7] beam git commit: NonNull by default in sdk/transforms/join
NonNull by default in sdk/transforms/join
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b2edb15
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b2edb15
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b2edb15
Branch: refs/heads/master
Commit: 7b2edb153967ec5d3cfa1e2e9fe538159f953b30
Parents: e25aba8
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 23 19:35:08 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Oct 28 08:42:30 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/join/CoGbkResult.java | 7 +++++--
.../beam/sdk/transforms/join/KeyedPCollectionTuple.java | 10 ++++------
.../apache/beam/sdk/transforms/join/RawUnionValue.java | 8 +++++---
.../org/apache/beam/sdk/transforms/join/package-info.java | 4 ++++
.../beam/sdk/nexmark/queries/NexmarkQueryModel.java | 5 ++---
.../java/org/apache/beam/sdk/nexmark/queries/Query8.java | 3 ++-
.../org/apache/beam/sdk/nexmark/queries/WinningBids.java | 3 ++-
7 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 877bb07..16a0bae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
@@ -197,7 +198,8 @@ public class CoGbkResult {
* <p>If tag was not part of the original {@link CoGroupByKey},
* throws an IllegalArgumentException.
*/
- public <V> V getOnly(TupleTag<V> tag, V defaultValue) {
+ @Nullable
+ public <V> V getOnly(TupleTag<V> tag, @Nullable V defaultValue) {
return innerGetOnly(tag, defaultValue, true);
}
@@ -356,9 +358,10 @@ public class CoGbkResult {
this.valueMap = valueMap;
}
+ @Nullable
private <V> V innerGetOnly(
TupleTag<V> tag,
- V defaultValue,
+ @Nullable V defaultValue,
boolean useDefault) {
int index = schema.getIndex(tag);
if (index < 0) {
http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
index 2e7dd01..a9d1873 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -207,24 +208,21 @@ public class KeyedPCollectionTuple<K> implements PInput {
*/
private final List<TaggedKeyedPCollection<K, ?>> keyedCollections;
- private Coder<K> keyCoder;
+ @Nullable private Coder<K> keyCoder;
private final CoGbkResultSchema schema;
private final Pipeline pipeline;
KeyedPCollectionTuple(Pipeline pipeline) {
- this(pipeline,
- new ArrayList<TaggedKeyedPCollection<K, ?>>(),
- TupleTagList.empty(),
- null);
+ this(pipeline, new ArrayList<TaggedKeyedPCollection<K, ?>>(), TupleTagList.empty(), null);
}
KeyedPCollectionTuple(
Pipeline pipeline,
List<TaggedKeyedPCollection<K, ?>> keyedCollections,
TupleTagList tupleTagList,
- Coder<K> keyCoder) {
+ @Nullable Coder<K> keyCoder) {
this.pipeline = pipeline;
this.keyedCollections = keyedCollections;
this.schema = new CoGbkResultSchema(tupleTagList);
http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
index 07bfe69..7ac1faf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
@@ -20,18 +20,20 @@ package org.apache.beam.sdk.transforms.join;
// TODO: Think about making this a complete dynamic union by adding
// a schema. Type would then be defined by the corresponding schema entry.
+import javax.annotation.Nullable;
+
/**
* This corresponds to an integer union tag and value. The mapping of
* union tag to type must come from elsewhere.
*/
public class RawUnionValue {
private final int unionTag;
- private final Object value;
+ @Nullable private final Object value;
/**
* Constructs a partial union from the given union tag and value.
*/
- public RawUnionValue(int unionTag, Object value) {
+ public RawUnionValue(int unionTag, @Nullable Object value) {
this.unionTag = unionTag;
this.value = value;
}
@@ -40,7 +42,7 @@ public class RawUnionValue {
return unionTag;
}
- public Object getValue() {
+ @Nullable public Object getValue() {
return value;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java
index f4b315e..7aab329 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java
@@ -19,4 +19,8 @@
* Defines the {@link org.apache.beam.sdk.transforms.join.CoGroupByKey} transform
* for joining multiple PCollections.
*/
+@DefaultAnnotation(NonNull.class)
package org.apache.beam.sdk.transforms.join;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
index 1f093a0..2efab3e 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
@@ -24,13 +24,11 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
-
+import javax.annotation.Nullable;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.model.KnownSize;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TimestampedValue;
-
-
import org.hamcrest.core.IsEqual;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -107,6 +105,7 @@ public abstract class NexmarkQueryModel implements Serializable {
return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() {
@Override
+ @Nullable
public Void apply(Iterable<TimestampedValue<KnownSize>> actual) {
Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
Assert.assertThat("wrong pipeline output", actualStrings,
http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
index fa3dd86..def7cb3 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.nexmark.queries;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.Auction;
@@ -78,7 +79,7 @@ public class Query8 extends NexmarkQuery {
ParDo.of(new DoFn<KV<Long, CoGbkResult>, IdNameReserve>() {
@ProcessElement
public void processElement(ProcessContext c) {
- Person person = c.element().getValue().getOnly(PERSON_TAG, null);
+ @Nullable Person person = c.element().getValue().getOnly(PERSON_TAG, null);
if (person == null) {
// Person was not created in last window period.
return;
http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
index d73b8ae..bc553c9 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
@@ -362,7 +363,7 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
@ProcessElement
public void processElement(ProcessContext c) {
- Auction auction =
+ @Nullable Auction auction =
c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null);
if (auction == null) {
// We have bids without a matching auction. Give up.