You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/28 16:18:33 UTC

[1/7] beam git commit: NonNull by default in sdk/transforms/windowing

Repository: beam
Updated Branches:
  refs/heads/master e686286f1 -> 5fb30ec82


NonNull by default in sdk/transforms/windowing


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8353b70
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8353b70
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8353b70

Branch: refs/heads/master
Commit: d8353b70143be4eb31298c120c9faf4a372beb37
Parents: 90be820
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 23 19:17:00 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Oct 28 08:42:29 2017 -0700

----------------------------------------------------------------------
 .../windowing/IncompatibleWindowException.java  |  2 +-
 .../MergeOverlappingIntervalWindows.java        |  4 +++-
 .../beam/sdk/transforms/windowing/Trigger.java  | 20 +++++++++++---------
 .../beam/sdk/transforms/windowing/Window.java   |  1 +
 .../sdk/transforms/windowing/package-info.java  |  4 ++++
 5 files changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d8353b70/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java
index b7b96ad..20746af 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java
@@ -32,7 +32,7 @@ public class IncompatibleWindowException extends Exception {
 
   @Override
   public String getMessage() {
-    String windowFn = givenWindowFn == null ? "null" : givenWindowFn.getClass().getSimpleName();
+    String windowFn = givenWindowFn.getClass().getSimpleName();
     return String.format("The given WindowFn is %s. %s", windowFn, reason);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d8353b70/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java
index 0a68021..0421868 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Internal;
 
 /**
@@ -61,9 +62,10 @@ public class MergeOverlappingIntervalWindows {
   }
 
   private static class MergeCandidate {
-    private IntervalWindow union;
+    @Nullable private IntervalWindow union;
     private final List<IntervalWindow> parts;
     public MergeCandidate() {
+      union = null;
       parts = new ArrayList<>();
     }
     public MergeCandidate(IntervalWindow window) {

http://git-wip-us.apache.org/repos/asf/beam/blob/d8353b70/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
index 519ab67..6985565 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -71,9 +72,9 @@ import org.joda.time.Instant;
 @Experimental(Experimental.Kind.TRIGGER)
 public abstract class Trigger implements Serializable {
 
-  protected final List<Trigger> subTriggers;
+  @Nullable protected final List<Trigger> subTriggers;
 
-  protected Trigger(List<Trigger> subTriggers) {
+  protected Trigger(@Nullable List<Trigger> subTriggers) {
     this.subTriggers = subTriggers;
   }
 
@@ -107,15 +108,16 @@ public abstract class Trigger implements Serializable {
   }
 
   /**
-   * Subclasses should override this to return the {@link #getContinuationTrigger} of this
-   * {@link Trigger}. For convenience, this is provided the continuation trigger of each of the
+   * Subclasses should override this to return the {@link #getContinuationTrigger} of this {@link
+   * Trigger}. For convenience, this is provided the continuation trigger of each of the
    * sub-triggers in the same order as {@link #subTriggers}.
    *
-   * @param continuationTriggers null if {@link #subTriggers} is null, otherwise contains the
-   *                             result of {@link #getContinuationTrigger()} on each of the
-   *                             subTriggers in the same order.
+   * @param continuationTriggers {@code null} if {@link #subTriggers} is {@code null}, otherwise
+   *     contains the result of {@link #getContinuationTrigger()} on each of the subTriggers in the
+   *     same order.
    */
-  protected abstract Trigger getContinuationTrigger(List<Trigger> continuationTriggers);
+  @Nullable
+  protected abstract Trigger getContinuationTrigger(@Nullable List<Trigger> continuationTriggers);
 
   /**
    * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
@@ -224,7 +226,7 @@ public abstract class Trigger implements Serializable {
    */
   @Internal
   public abstract static class OnceTrigger extends Trigger {
-    protected OnceTrigger(List<Trigger> subTriggers) {
+    protected OnceTrigger(@Nullable List<Trigger> subTriggers) {
       super(subTriggers);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d8353b70/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 2337798..3ec8136 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -486,6 +486,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
       original.populateDisplayData(builder);
     }
 
+    @Nullable
     public WindowFn<T, ?> getWindowFn() {
       return updatedStrategy.getWindowFn();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/d8353b70/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/package-info.java
index 406e279..332a7b0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/package-info.java
@@ -47,4 +47,8 @@
  * {@link org.apache.beam.sdk.transforms.windowing.AfterWatermark} for details on the
  * watermark.
  */
+@DefaultAnnotation(NonNull.class)
 package org.apache.beam.sdk.transforms.windowing;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;


[5/7] beam git commit: NonNull by default in sdk/transforms/reflect

Posted by ke...@apache.org.
NonNull by default in sdk/transforms/reflect


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f9e73dbb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f9e73dbb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f9e73dbb

Branch: refs/heads/master
Commit: f9e73dbb36f6632faea4c4bc445c47e2146abfba
Parents: d8353b7
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 23 19:23:41 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Oct 28 08:42:30 2017 -0700

----------------------------------------------------------------------
 .../reflect/ByteBuddyDoFnInvokerFactory.java    |  3 +-
 .../sdk/transforms/reflect/DoFnInvoker.java     | 50 ++++++++++++++++----
 .../sdk/transforms/reflect/DoFnSignatures.java  |  1 +
 .../sdk/transforms/reflect/package-info.java    |  3 ++
 .../transforms/reflect/DoFnInvokersTest.java    | 12 ++++-
 5 files changed, 57 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f9e73dbb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index cf96c9b..8ce3348 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -394,7 +394,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
     /** Whether the target method returns non-void. */
     private final boolean targetHasReturn;
 
-    protected FieldDescription delegateField;
+    /** Starts {@code null}, initialized by {@link #prepare(InstrumentedType)}. */
+    @Nullable protected FieldDescription delegateField;
 
     private final TypeDescription doFnType;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e73dbb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 8b41fee..ec2bf34 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -133,51 +133,81 @@ public interface DoFnInvoker<InputT, OutputT> {
     Timer timer(String timerId);
   }
 
-  /** For testing only, this {@link ArgumentProvider} returns {@code null} for all parameters. */
+  /**
+   * For testing only, this {@link ArgumentProvider} throws {@link UnsupportedOperationException}
+   * for all parameters.
+   */
   class FakeArgumentProvider<InputT, OutputT> implements ArgumentProvider<InputT, OutputT> {
     @Override
     public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
-      return null;
+      throw new UnsupportedOperationException(
+          String.format(
+              "Should never call non-overridden methods of %s",
+              FakeArgumentProvider.class.getSimpleName()));
     }
 
     @Override
     public BoundedWindow window() {
-      return null;
+      throw new UnsupportedOperationException(
+          String.format(
+              "Should never call non-overridden methods of %s",
+              FakeArgumentProvider.class.getSimpleName()));
     }
 
     @Override
     public PipelineOptions pipelineOptions() {
-      return null;
+      throw new UnsupportedOperationException(
+          String.format(
+              "Should never call non-overridden methods of %s",
+              FakeArgumentProvider.class.getSimpleName()));
     }
 
     @Override
     public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
-      return null;
+      throw new UnsupportedOperationException(
+          String.format(
+              "Should never call non-overridden methods of %s",
+              FakeArgumentProvider.class.getSimpleName()));
     }
 
     @Override
     public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
         DoFn<InputT, OutputT> doFn) {
-      return null;
+      throw new UnsupportedOperationException(
+          String.format(
+              "Should never call non-overridden methods of %s",
+              FakeArgumentProvider.class.getSimpleName()));
     }
 
     @Override
     public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-      return null;
+      throw new UnsupportedOperationException(
+          String.format(
+              "Should never call non-overridden methods of %s",
+              FakeArgumentProvider.class.getSimpleName()));
     }
 
     @Override
     public State state(String stateId) {
-      return null;
+      throw new UnsupportedOperationException(
+          String.format(
+              "Should never call non-overridden methods of %s",
+              FakeArgumentProvider.class.getSimpleName()));
     }
 
     @Override
     public Timer timer(String timerId) {
-      return null;
+      throw new UnsupportedOperationException(
+          String.format(
+              "Should never call non-overridden methods of %s",
+              FakeArgumentProvider.class.getSimpleName()));
     }
 
     public RestrictionTracker<?> restrictionTracker() {
-      return null;
+      throw new UnsupportedOperationException(
+          String.format(
+              "Should never call non-overridden methods of %s",
+              FakeArgumentProvider.class.getSimpleName()));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e73dbb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index de57c3b..c54c44f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -1296,6 +1296,7 @@ public class DoFnSignatures {
     return  ImmutableMap.copyOf(declarations);
   }
 
+  @Nullable
   private static Method findAnnotatedMethod(
       ErrorReporter errors, Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
     Collection<Method> matches = declaredMethodsWithAnnotation(anno, fnClazz, DoFn.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e73dbb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
index fe2f6b1..48b128c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
@@ -22,5 +22,8 @@
  * and creating {@link org.apache.beam.sdk.transforms.reflect.DoFnSignature}'s and
  * {@link org.apache.beam.sdk.transforms.reflect.DoFnInvoker}'s from them.
  */
+@DefaultAnnotation(NonNull.class)
 package org.apache.beam.sdk.transforms.reflect;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e73dbb/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 2098c66..72883ff 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -655,7 +655,17 @@ public class DoFnInvokersTest {
                 return null;
               }
             })
-        .invokeProcessElement(new FakeArgumentProvider<Integer, Integer>());
+        .invokeProcessElement(new FakeArgumentProvider<Integer, Integer>() {
+          @Override
+          public DoFn.ProcessContext processContext(DoFn<Integer, Integer> doFn) {
+            return null; // will not be touched
+          }
+
+          @Override
+          public RestrictionTracker<?> restrictionTracker() {
+            return null; // will not be touched
+          }
+        });
   }
 
   @Test


[6/7] beam git commit: NonNull by default in sdk/transforms/splittabledofn

Posted by ke...@apache.org.
NonNull by default in sdk/transforms/splittabledofn


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82fc7208
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82fc7208
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82fc7208

Branch: refs/heads/master
Commit: 82fc720808f75e84f8502f681b25560735dfa7f1
Parents: 7b2edb1
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 26 06:57:55 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Oct 28 08:42:31 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java  | 5 +++--
 .../apache/beam/sdk/transforms/splittabledofn/package-info.java | 4 ++++
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/82fc7208/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
index 4987409..8ec2c6b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.MoreObjects;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn;
 
@@ -31,8 +32,8 @@ import org.apache.beam.sdk.transforms.DoFn;
  */
 public class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
   private OffsetRange range;
-  private Long lastClaimedOffset = null;
-  private Long lastAttemptedOffset = null;
+  @Nullable private Long lastClaimedOffset = null;
+  @Nullable private Long lastAttemptedOffset = null;
 
   public OffsetRangeTracker(OffsetRange range) {
     this.range = checkNotNull(range);

http://git-wip-us.apache.org/repos/asf/beam/blob/82fc7208/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java
index 4523032..82538ea 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java
@@ -19,4 +19,8 @@
  * Defines utilities related to <a href="https://s.apache.org/splittable-do-fn">splittable</a>
  * {@link org.apache.beam.sdk.transforms.DoFn}.
  */
+@DefaultAnnotation(NonNull.class)
 package org.apache.beam.sdk.transforms.splittabledofn;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;


[2/7] beam git commit: NonNull by default in sdk/transforms

Posted by ke...@apache.org.
NonNull by default in sdk/transforms


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/90be8209
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/90be8209
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/90be8209

Branch: refs/heads/master
Commit: 90be8209bb7321645e674eef63f7e5f02ee93c31
Parents: e686286
Author: Kenneth Knowles <kl...@google.com>
Authored: Sat Oct 21 19:42:33 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Oct 28 08:42:29 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/testing/PAssert.java    |  5 +++
 .../beam/sdk/transforms/ApproximateUnique.java  |  4 +-
 .../org/apache/beam/sdk/transforms/Combine.java | 10 +++--
 .../apache/beam/sdk/transforms/CombineFns.java  |  2 +
 .../org/apache/beam/sdk/transforms/Create.java  |  2 +-
 .../apache/beam/sdk/transforms/Distinct.java    |  2 +
 .../apache/beam/sdk/transforms/DoFnTester.java  | 27 ++++++++----
 .../beam/sdk/transforms/FlatMapElements.java    | 14 ++++--
 .../apache/beam/sdk/transforms/MapElements.java | 14 ++++--
 .../org/apache/beam/sdk/transforms/Max.java     |  5 ++-
 .../org/apache/beam/sdk/transforms/Min.java     |  8 ++--
 .../apache/beam/sdk/transforms/PTransform.java  |  7 +--
 .../org/apache/beam/sdk/transforms/Top.java     |  5 ++-
 .../org/apache/beam/sdk/transforms/View.java    |  7 +--
 .../org/apache/beam/sdk/transforms/Watch.java   |  4 +-
 .../apache/beam/sdk/transforms/WithKeys.java    |  6 ++-
 .../beam/sdk/transforms/package-info.java       |  4 ++
 sdks/java/extensions/sql/pom.xml                |  5 +++
 .../extensions/sql/example/BeamSqlExample.java  | 45 +++++++++++---------
 .../impl/transform/BeamBuiltinAggregations.java | 10 ++++-
 20 files changed, 126 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index ef45491..aed38dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -31,6 +31,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.PipelineRunner;
@@ -605,6 +606,7 @@ public class PAssert {
       }
 
       @Override
+      @Nullable
       public Void apply(T actual) {
         assertThat(actual, matcher);
         return null;
@@ -1269,6 +1271,7 @@ public class PAssert {
     }
 
     @Override
+    @Nullable
     public Void apply(T actual) {
       assertThat(actual, equalTo(expected));
       return null;
@@ -1287,6 +1290,7 @@ public class PAssert {
     }
 
     @Override
+    @Nullable
     public Void apply(T actual) {
       assertThat(actual, not(equalTo(expected)));
       return null;
@@ -1316,6 +1320,7 @@ public class PAssert {
     }
 
     @Override
+    @Nullable
     public Void apply(Iterable<T> actual) {
       assertThat(actual, containsInAnyOrder(expected));
       return null;

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
index 5d38206..98c971d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
@@ -26,7 +26,7 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.PriorityQueue;
-import org.apache.avro.reflect.Nullable;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
@@ -455,7 +455,7 @@ public class ApproximateUnique {
   }
 
   private static void populateDisplayData(
-      DisplayData.Builder builder, long sampleSize, Double maxEstimationError) {
+      DisplayData.Builder builder, long sampleSize, @Nullable Double maxEstimationError) {
     builder
         .add(DisplayData.item("sampleSize", sampleSize)
           .withLabel("Sample Size"))

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index fab98f8..3c5b55b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -31,6 +31,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -440,6 +441,7 @@ public class Combine {
     /**
      * Returns the value that should be used for the combine of the empty set.
      */
+    @Nullable
     public V identity() {
       return null;
     }
@@ -506,7 +508,7 @@ public class Combine {
    * <p>Used only as a private accumulator class.
    */
   public static class Holder<V> {
-    private V value;
+    @Nullable private V value;
     private boolean present;
     private Holder() { }
     private Holder(V value) {
@@ -1945,10 +1947,10 @@ public class Combine {
      * the hot and cold key paths.
      */
     private static class InputOrAccum<InputT, AccumT> {
-      public final InputT input;
-      public final AccumT accum;
+      @Nullable public final InputT input;
+      @Nullable public final AccumT accum;
 
-      private InputOrAccum(InputT input, AccumT aggr) {
+      private InputOrAccum(@Nullable InputT input, @Nullable AccumT aggr) {
         this.input = input;
         this.accum = aggr;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index c619783..02cb884 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -31,6 +31,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -176,6 +177,7 @@ public class CombineFns {
      * <p>It is an error to request a non-exist tuple tag from the {@link CoCombineResult}.
      */
     @SuppressWarnings("unchecked")
+    @Nullable
     public <V> V get(TupleTag<V> tag) {
       checkArgument(
           valuesMap.keySet().contains(tag), "TupleTag " + tag + " is not in the CoCombineResult");

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 972675d..7f5920c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -137,7 +137,7 @@ public class Create<T> {
    * Otherwise, use {@link Create.Values#withCoder} to set the coder explicitly.
    */
   @SafeVarargs
-  public static <T> Values<T> of(T elem, T... elems) {
+  public static <T> Values<T> of(@Nullable T elem, @Nullable T... elems) {
     // This can't be an ImmutableList, as it may accept nulls
     List<T> input = new ArrayList<>(elems.length + 1);
     input.add(elem);

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
index d751dbe..a0ddd14 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.values.KV;
@@ -115,6 +116,7 @@ public class Distinct<T> extends PTransform<PCollection<T>,
                 Combine.<T, Void>perKey(
                     new SerializableFunction<Iterable<Void>, Void>() {
                       @Override
+                      @Nullable
                       public Void apply(Iterable<Void> iter) {
                         return null; // ignore input
                       }

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index b2377dd..6168710 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -30,6 +30,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -514,17 +516,17 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
 
   private <T> List<ValueInSingleWindow<T>> getImmutableOutput(TupleTag<T> tag) {
     @SuppressWarnings({"unchecked", "rawtypes"})
-    List<ValueInSingleWindow<T>> elems = (List) outputs.get(tag);
+    List<ValueInSingleWindow<T>> elems = (List) getOutputs().get(tag);
     return ImmutableList.copyOf(
         MoreObjects.firstNonNull(elems, Collections.<ValueInSingleWindow<T>>emptyList()));
   }
 
   @SuppressWarnings({"unchecked", "rawtypes"})
   public <T> List<ValueInSingleWindow<T>> getMutableOutput(TupleTag<T> tag) {
-    List<ValueInSingleWindow<T>> outputList = (List) outputs.get(tag);
+    List<ValueInSingleWindow<T>> outputList = (List) getOutputs().get(tag);
     if (outputList == null) {
       outputList = new ArrayList<>();
-      outputs.put(tag, (List) outputList);
+      getOutputs().put(tag, (List) outputList);
     }
     return outputList;
   }
@@ -688,11 +690,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
   private TupleTag<OutputT> mainOutputTag = new TupleTag<>();
 
   /** The original DoFn under test, if started. */
-  private DoFn<InputT, OutputT> fn;
-  private DoFnInvoker<InputT, OutputT> fnInvoker;
+  @Nullable private DoFn<InputT, OutputT> fn;
 
-  /** The outputs from the {@link DoFn} under test. */
-  private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs;
+  @Nullable private DoFnInvoker<InputT, OutputT> fnInvoker;
+
+  /** The outputs from the {@link DoFn} under test. Access via {@link #getOutputs()}. */
+  @CheckForNull private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs;
 
   /** The state of processing of the {@link DoFn} under test. */
   private State state = State.UNINITIALIZED;
@@ -704,12 +707,14 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
       param.match(
           new DoFnSignature.Parameter.Cases.WithDefault<Void>() {
             @Override
+            @Nullable
             public Void dispatch(DoFnSignature.Parameter.ProcessContextParameter p) {
               // ProcessContext parameter is obviously supported.
               return null;
             }
 
             @Override
+            @Nullable
             public Void dispatch(DoFnSignature.Parameter.WindowParameter p) {
               // We also support the BoundedWindow parameter.
               return null;
@@ -738,6 +743,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     }
     fnInvoker = DoFnInvokers.invokerFor(fn);
     fnInvoker.invokeSetup();
-    outputs = new HashMap<>();
+  }
+
+  private Map getOutputs() {
+    if (outputs == null) {
+      outputs = new HashMap<>();
+    }
+    return outputs;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index 97e1dfb..193bb6e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -34,15 +35,15 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  */
 public class FlatMapElements<InputT, OutputT>
 extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
-  private final transient TypeDescriptor<InputT> inputType;
-  private final transient TypeDescriptor<OutputT> outputType;
+  @Nullable private final transient TypeDescriptor<InputT> inputType;
+  @Nullable private final transient TypeDescriptor<OutputT> outputType;
   @Nullable private final transient Object originalFnForDisplayData;
   @Nullable private final Contextful<Fn<InputT, Iterable<OutputT>>> fn;
 
   private FlatMapElements(
       @Nullable Contextful<Fn<InputT, Iterable<OutputT>>> fn,
       @Nullable Object originalFnForDisplayData,
-      TypeDescriptor<InputT> inputType,
+      @Nullable TypeDescriptor<InputT> inputType,
       TypeDescriptor<OutputT> outputType) {
     this.fn = fn;
     this.originalFnForDisplayData = originalFnForDisplayData;
@@ -146,6 +147,13 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
 
                   @Override
                   public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+                    checkState(
+                        outputType != null,
+                        "%s output type descriptor was null; "
+                            + "this probably means that getOutputTypeDescriptor() was called after "
+                            + "serialization/deserialization, but it is only available prior to "
+                            + "serialization, for constructing a pipeline and inferring coders",
+                        FlatMapElements.class.getSimpleName());
                     return outputType;
                   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 1d259ac..e1d6c11 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -34,15 +35,15 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  */
 public class MapElements<InputT, OutputT>
 extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
-  private final transient TypeDescriptor<InputT> inputType;
-  private final transient TypeDescriptor<OutputT> outputType;
+  @Nullable private final transient TypeDescriptor<InputT> inputType;
+  @Nullable private final transient TypeDescriptor<OutputT> outputType;
   @Nullable private final transient Object originalFnForDisplayData;
   @Nullable private final Contextful<Fn<InputT, OutputT>> fn;
 
   private MapElements(
       @Nullable Contextful<Fn<InputT, OutputT>> fn,
       @Nullable Object originalFnForDisplayData,
-      TypeDescriptor<InputT> inputType,
+      @Nullable TypeDescriptor<InputT> inputType,
       TypeDescriptor<OutputT> outputType) {
     this.fn = fn;
     this.originalFnForDisplayData = originalFnForDisplayData;
@@ -140,6 +141,13 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
 
               @Override
               public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+                checkState(
+                    outputType != null,
+                    "%s output type descriptor was null; "
+                        + "this probably means that getOutputTypeDescriptor() was called after "
+                        + "serialization/deserialization, but it is only available prior to "
+                        + "serialization, for constructing a pipeline and inferring coders",
+                    MapElements.class.getSimpleName());
                 return outputType;
               }
             }).withSideInputs(fn.getRequirements().getSideInputs()));

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 710fe77..384404a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms;
 
 import java.io.Serializable;
 import java.util.Comparator;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 
@@ -214,11 +215,11 @@ public class Max {
 
   private static class MaxFn<T> extends BinaryCombineFn<T> {
 
-    private final T identity;
+    @Nullable private final T identity;
     private final Comparator<? super T> comparator;
 
     private <ComparatorT extends Comparator<? super T> & Serializable> MaxFn(
-        T identity, ComparatorT comparator) {
+        @Nullable T identity, ComparatorT comparator) {
       this.identity = identity;
       this.comparator = comparator;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index c566fb3..65b3e6e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms;
 
 import java.io.Serializable;
 import java.util.Comparator;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 
@@ -214,11 +215,11 @@ public class Min {
 
   private static class MinFn<T> extends BinaryCombineFn<T> {
 
-    private final T identity;
+    @Nullable private final T identity;
     private final Comparator<? super T> comparator;
 
     private <ComparatorT extends Comparator<? super T> & Serializable> MinFn(
-        T identity, ComparatorT comparator) {
+        @Nullable T identity, ComparatorT comparator) {
       this.identity = identity;
       this.comparator = comparator;
     }
@@ -236,8 +237,7 @@ public class Min {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      builder.add(DisplayData.item("comparer", comparator.getClass())
-        .withLabel("Record Comparer"));
+      builder.add(DisplayData.item("comparer", comparator.getClass()).withLabel("Record Comparer"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index f5e7830..139d82a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -22,6 +22,7 @@ import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -197,7 +198,7 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
    *
    * <p>By default, does nothing.
    */
-  public void validate(PipelineOptions options) {}
+  public void validate(@Nullable PipelineOptions options) {}
 
   /**
    * Returns all {@link PValue PValues} that are consumed as inputs to this {@link PTransform} that
@@ -227,13 +228,13 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
    * The base name of this {@code PTransform}, e.g., from defaults, or
    * {@code null} if not yet assigned.
    */
-  protected final transient String name;
+  @Nullable protected final transient String name;
 
   protected PTransform() {
     this.name = null;
   }
 
-  protected PTransform(String name) {
+  protected PTransform(@Nullable String name) {
     this.name = name;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index 319f779..35d6703 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -29,6 +29,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
 import java.util.PriorityQueue;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
@@ -453,14 +454,14 @@ public class Top {
      *
      * <p>Only one of asList and asQueue may be non-null.
      */
-    private PriorityQueue<T> asQueue;
+    @Nullable private PriorityQueue<T> asQueue;
 
     /**
      * A list in with largest first, the form of extractOutput().
      *
      * <p>Only one of asList and asQueue may be non-null.
      */
-    private List<T> asList;
+    @Nullable private List<T> asList;
 
     /** The user-provided Comparator. */
     private final ComparatorT compareFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index e463d46..eaa7925 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms;
 
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
@@ -300,7 +301,7 @@ public class View {
    */
   @Internal
   public static class AsSingleton<T> extends PTransform<PCollection<T>, PCollectionView<T>> {
-    private final T defaultValue;
+    @Nullable private final T defaultValue;
     private final boolean hasDefault;
 
     private AsSingleton() {
@@ -353,8 +354,8 @@ public class View {
 
   private static class SingletonCombineFn<T> extends Combine.BinaryCombineFn<T> {
     private final boolean hasDefault;
-    private final Coder<T> valueCoder;
-    private final byte[] defaultValue;
+    @Nullable private final Coder<T> valueCoder;
+    @Nullable private final byte[] defaultValue;
 
     private SingletonCombineFn(boolean hasDefault, Coder<T> coder, T defaultValue) {
       this.hasDefault = hasDefault;

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index a3c906c..75c2fe4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -247,7 +247,7 @@ public class Watch {
        * Called by the {@link Watch} transform to create a new independent termination state for a
        * newly arrived {@code InputT}.
        */
-      StateT forNewInput(Instant now, InputT input);
+      StateT forNewInput(Instant now, @Nullable InputT input);
 
       /**
        * Called by the {@link Watch} transform to compute a new termination state, in case after
@@ -799,7 +799,7 @@ public class Watch {
     // Outputs that have been claimed in the current ProcessElement call. A prefix of "pending".
     private List<TimestampedValue<OutputT>> claimed = Lists.newArrayList();
     private boolean isOutputComplete;
-    private TerminationStateT terminationState;
+    @Nullable private TerminationStateT terminationState;
     @Nullable private Instant pollWatermark;
     private boolean shouldStop = false;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index 79cac99..23696e5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
@@ -78,7 +80,7 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
    * given key.
    */
   @SuppressWarnings("unchecked")
-  public static <K, V> WithKeys<K, V> of(final K key) {
+  public static <K, V> WithKeys<K, V> of(@Nullable final K key) {
     return new WithKeys<>(
         new SerializableFunction<V, K>() {
           @Override
@@ -93,7 +95,7 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
   /////////////////////////////////////////////////////////////////////////////
 
   private SerializableFunction<V, K> fn;
-  private transient Class<K> keyClass;
+  @CheckForNull private transient Class<K> keyClass;
 
   private WithKeys(SerializableFunction<V, K> fn, Class<K> keyClass) {
     this.fn = fn;

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/package-info.java
index 892dee9..634786b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/package-info.java
@@ -40,4 +40,8 @@
  * for their own application-specific logic.
  *
  */
+@DefaultAnnotation(NonNull.class)
 package org.apache.beam.sdk.transforms;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/extensions/sql/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/pom.xml b/sdks/java/extensions/sql/pom.xml
index cf3b7dc..742d3b6 100644
--- a/sdks/java/extensions/sql/pom.xml
+++ b/sdks/java/extensions/sql/pom.xml
@@ -206,6 +206,11 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
index 350bb7b..0154e1e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.example;
 import java.sql.Types;
 import java.util.Arrays;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.BeamSql;
@@ -66,16 +67,18 @@ class BeamSqlExample {
         BeamSql.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));
 
     //print the output record of case 1;
-    outputStream.apply("log_result",
-        MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() {
-      public Void apply(BeamRecord input) {
-        //expect output:
-        //  PCOLLECTION: [3, row, 3.0]
-        //  PCOLLECTION: [2, row, 2.0]
-        System.out.println("PCOLLECTION: " + input.getDataValues());
-        return null;
-      }
-    }));
+    outputStream.apply(
+        "log_result",
+        MapElements.<BeamRecord, Void>via(
+            new SimpleFunction<BeamRecord, Void>() {
+              public @Nullable Void apply(BeamRecord input) {
+                //expect output:
+                //  PCOLLECTION: [3, row, 3.0]
+                //  PCOLLECTION: [2, row, 2.0]
+                System.out.println("PCOLLECTION: " + input.getDataValues());
+                return null;
+              }
+            }));
 
     //Case 2. run the query with BeamSql.query over result PCollection of case 1.
     PCollection<BeamRecord> outputStream2 =
@@ -83,16 +86,18 @@ class BeamSqlExample {
         .apply(BeamSql.queryMulti("select c2, sum(c3) from CASE1_RESULT group by c2"));
 
     //print the output record of case 2;
-    outputStream2.apply("log_result",
-        MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() {
-      @Override
-      public Void apply(BeamRecord input) {
-        //expect output:
-        //  CASE1_RESULT: [row, 5.0]
-        System.out.println("CASE1_RESULT: " + input.getDataValues());
-        return null;
-      }
-    }));
+    outputStream2.apply(
+        "log_result",
+        MapElements.<BeamRecord, Void>via(
+            new SimpleFunction<BeamRecord, Void>() {
+              @Override
+              public @Nullable Void apply(BeamRecord input) {
+                //expect output:
+                //  CASE1_RESULT: [row, 5.0]
+                System.out.println("CASE1_RESULT: " + input.getDataValues());
+                return null;
+              }
+            }));
 
     p.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
index b5a5266..ad15f98 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
@@ -23,6 +23,7 @@ import java.math.MathContext;
 import java.math.RoundingMode;
 import java.util.Date;
 import java.util.Iterator;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.BigDecimalCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
@@ -253,6 +254,7 @@ class BeamBuiltinAggregations {
   }
 
   static class IntegerAvg extends Avg<Integer>{
+    @Nullable
     public Integer extractOutput(KV<Integer, BigDecimal> accumulator) {
       return accumulator.getKey() == 0 ? null : prepareOutput(accumulator).intValue();
     }
@@ -263,6 +265,7 @@ class BeamBuiltinAggregations {
   }
 
   static class LongAvg extends Avg<Long>{
+    @Nullable
     public Long extractOutput(KV<Integer, BigDecimal> accumulator) {
       return accumulator.getKey() == 0 ? null : prepareOutput(accumulator).longValue();
     }
@@ -273,6 +276,7 @@ class BeamBuiltinAggregations {
   }
 
   static class ShortAvg extends Avg<Short>{
+    @Nullable
     public Short extractOutput(KV<Integer, BigDecimal> accumulator) {
       return accumulator.getKey() == 0 ? null : prepareOutput(accumulator).shortValue();
     }
@@ -282,7 +286,8 @@ class BeamBuiltinAggregations {
     }
   }
 
-  static class ByteAvg extends Avg<Byte>{
+  static class ByteAvg extends Avg<Byte> {
+    @Nullable
     public Byte extractOutput(KV<Integer, BigDecimal> accumulator) {
       return accumulator.getKey() == 0 ? null : prepareOutput(accumulator).byteValue();
     }
@@ -293,6 +298,7 @@ class BeamBuiltinAggregations {
   }
 
   static class FloatAvg extends Avg<Float>{
+    @Nullable
     public Float extractOutput(KV<Integer, BigDecimal> accumulator) {
       return accumulator.getKey() == 0 ? null : prepareOutput(accumulator).floatValue();
     }
@@ -303,6 +309,7 @@ class BeamBuiltinAggregations {
   }
 
   static class DoubleAvg extends Avg<Double>{
+    @Nullable
     public Double extractOutput(KV<Integer, BigDecimal> accumulator) {
       return accumulator.getKey() == 0 ? null : prepareOutput(accumulator).doubleValue();
     }
@@ -313,6 +320,7 @@ class BeamBuiltinAggregations {
   }
 
   static class BigDecimalAvg extends Avg<BigDecimal>{
+    @Nullable
     public BigDecimal extractOutput(KV<Integer, BigDecimal> accumulator) {
       return accumulator.getKey() == 0 ? null : prepareOutput(accumulator);
     }


[4/7] beam git commit: NonNull by default in sdk/transforms/display

Posted by ke...@apache.org.
NonNull by default in sdk/transforms/display


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e25aba8e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e25aba8e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e25aba8e

Branch: refs/heads/master
Commit: e25aba8e776ee85ecec3ebaa49d6c13240911b12
Parents: f9e73db
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 23 19:32:02 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Oct 28 08:42:30 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/display/DisplayData.java | 12 ++++++------
 .../beam/sdk/transforms/display/package-info.java       |  4 ++++
 2 files changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e25aba8e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 917c070..1b4b48f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -710,14 +710,14 @@ public class DisplayData implements Serializable {
      */
     private static final FormattedItemValue NULL_VALUES = new FormattedItemValue(null);
 
-    private final Object shortValue;
-    private final Object longValue;
+    @Nullable private final Object shortValue;
+    @Nullable private final Object longValue;
 
-    private FormattedItemValue(Object longValue) {
+    private FormattedItemValue(@Nullable Object longValue) {
       this(longValue, null);
     }
 
-    private FormattedItemValue(Object longValue, Object shortValue) {
+    private FormattedItemValue(@Nullable Object longValue, @Nullable Object shortValue) {
       this.longValue = longValue;
       this.shortValue = shortValue;
     }
@@ -735,8 +735,8 @@ public class DisplayData implements Serializable {
     private final Set<HasDisplayData> visitedComponents;
     private final Map<Path, HasDisplayData> visitedPathMap;
 
-    private Path latestPath;
-    private Class<?> latestNs;
+    @Nullable private Path latestPath;
+    @Nullable private Class<?> latestNs;
 
     private InternalBuilder() {
       this.entries = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/beam/blob/e25aba8e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/package-info.java
index 4af3327..e4fff40 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/package-info.java
@@ -23,4 +23,8 @@
  *
  * @see org.apache.beam.sdk.transforms.display.HasDisplayData
  */
+@DefaultAnnotation(NonNull.class)
 package org.apache.beam.sdk.transforms.display;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;


[3/7] beam git commit: NonNull by default in sdk/transforms/join

Posted by ke...@apache.org.
NonNull by default in sdk/transforms/join


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b2edb15
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b2edb15
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b2edb15

Branch: refs/heads/master
Commit: 7b2edb153967ec5d3cfa1e2e9fe538159f953b30
Parents: e25aba8
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 23 19:35:08 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Oct 28 08:42:30 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/join/CoGbkResult.java  |  7 +++++--
 .../beam/sdk/transforms/join/KeyedPCollectionTuple.java   | 10 ++++------
 .../apache/beam/sdk/transforms/join/RawUnionValue.java    |  8 +++++---
 .../org/apache/beam/sdk/transforms/join/package-info.java |  4 ++++
 .../beam/sdk/nexmark/queries/NexmarkQueryModel.java       |  5 ++---
 .../java/org/apache/beam/sdk/nexmark/queries/Query8.java  |  3 ++-
 .../org/apache/beam/sdk/nexmark/queries/WinningBids.java  |  3 ++-
 7 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 877bb07..16a0bae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
@@ -197,7 +198,8 @@ public class CoGbkResult {
    * <p>If tag was not part of the original {@link CoGroupByKey},
    * throws an IllegalArgumentException.
    */
-  public <V> V getOnly(TupleTag<V> tag, V defaultValue) {
+  @Nullable
+  public <V> V getOnly(TupleTag<V> tag, @Nullable V defaultValue) {
     return innerGetOnly(tag, defaultValue, true);
   }
 
@@ -356,9 +358,10 @@ public class CoGbkResult {
     this.valueMap = valueMap;
   }
 
+  @Nullable
   private <V> V innerGetOnly(
       TupleTag<V> tag,
-      V defaultValue,
+      @Nullable V defaultValue,
       boolean useDefault) {
     int index = schema.getIndex(tag);
     if (index < 0) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
index 2e7dd01..a9d1873 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -207,24 +208,21 @@ public class KeyedPCollectionTuple<K> implements PInput {
    */
   private final List<TaggedKeyedPCollection<K, ?>> keyedCollections;
 
-  private Coder<K> keyCoder;
+  @Nullable private Coder<K> keyCoder;
 
   private final CoGbkResultSchema schema;
 
   private final Pipeline pipeline;
 
   KeyedPCollectionTuple(Pipeline pipeline) {
-    this(pipeline,
-         new ArrayList<TaggedKeyedPCollection<K, ?>>(),
-         TupleTagList.empty(),
-         null);
+    this(pipeline, new ArrayList<TaggedKeyedPCollection<K, ?>>(), TupleTagList.empty(), null);
   }
 
   KeyedPCollectionTuple(
       Pipeline pipeline,
       List<TaggedKeyedPCollection<K, ?>> keyedCollections,
       TupleTagList tupleTagList,
-      Coder<K> keyCoder) {
+      @Nullable Coder<K> keyCoder) {
     this.pipeline = pipeline;
     this.keyedCollections = keyedCollections;
     this.schema = new CoGbkResultSchema(tupleTagList);

http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
index 07bfe69..7ac1faf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
@@ -20,18 +20,20 @@ package org.apache.beam.sdk.transforms.join;
 // TODO: Think about making this a complete dynamic union by adding
 // a schema.  Type would then be defined by the corresponding schema entry.
 
+import javax.annotation.Nullable;
+
 /**
  * This corresponds to an integer union tag and value.  The mapping of
  * union tag to type must come from elsewhere.
  */
 public class RawUnionValue {
   private final int unionTag;
-  private final Object value;
+  @Nullable private final Object value;
 
   /**
    * Constructs a partial union from the given union tag and value.
    */
-  public RawUnionValue(int unionTag, Object value) {
+  public RawUnionValue(int unionTag, @Nullable Object value) {
     this.unionTag = unionTag;
     this.value = value;
   }
@@ -40,7 +42,7 @@ public class RawUnionValue {
     return unionTag;
   }
 
-  public Object getValue() {
+  @Nullable public Object getValue() {
     return value;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java
index f4b315e..7aab329 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java
@@ -19,4 +19,8 @@
  * Defines the {@link org.apache.beam.sdk.transforms.join.CoGroupByKey} transform
  * for joining multiple PCollections.
  */
+@DefaultAnnotation(NonNull.class)
 package org.apache.beam.sdk.transforms.join;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;

http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
index 1f093a0..2efab3e 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
@@ -24,13 +24,11 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
 import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.TimestampedValue;
-
-
 import org.hamcrest.core.IsEqual;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -107,6 +105,7 @@ public abstract class NexmarkQueryModel implements Serializable {
 
     return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() {
       @Override
+      @Nullable
       public Void apply(Iterable<TimestampedValue<KnownSize>> actual) {
       Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
         Assert.assertThat("wrong pipeline output", actualStrings,

http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
index fa3dd86..def7cb3 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.nexmark.queries;
 
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
 import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.Auction;
@@ -78,7 +79,7 @@ public class Query8 extends NexmarkQuery {
             ParDo.of(new DoFn<KV<Long, CoGbkResult>, IdNameReserve>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
-                    Person person = c.element().getValue().getOnly(PERSON_TAG, null);
+                    @Nullable Person person = c.element().getValue().getOnly(PERSON_TAG, null);
                     if (person == null) {
                       // Person was not created in last window period.
                       return;

http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
index d73b8ae..bc553c9 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.TreeMap;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
@@ -362,7 +363,7 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
 
             @ProcessElement
             public void processElement(ProcessContext c) {
-              Auction auction =
+              @Nullable Auction auction =
                   c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null);
               if (auction == null) {
                 // We have bids without a matching auction. Give up.


[7/7] beam git commit: This closes #4049: [BEAM-3081] NonNull by default for sdk/transforms and below

Posted by ke...@apache.org.
This closes #4049: [BEAM-3081] NonNull by default for sdk/transforms and below

  NonNull by default in sdk/transforms/splittabledofn
  NonNull by default in sdk/transforms/join
  NonNull by default in sdk/transforms/display
  NonNull by default in sdk/transforms/reflect
  NonNull by default in sdk/transforms/windowing
  NonNull by default in sdk/transforms


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5fb30ec8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5fb30ec8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5fb30ec8

Branch: refs/heads/master
Commit: 5fb30ec8265c841cd8c4e6ae16b43be1f171eabb
Parents: e686286 82fc720
Author: Kenneth Knowles <kl...@google.com>
Authored: Sat Oct 28 08:43:06 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Oct 28 08:43:06 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/testing/PAssert.java    |  5 ++
 .../beam/sdk/transforms/ApproximateUnique.java  |  4 +-
 .../org/apache/beam/sdk/transforms/Combine.java | 10 ++--
 .../apache/beam/sdk/transforms/CombineFns.java  |  2 +
 .../org/apache/beam/sdk/transforms/Create.java  |  2 +-
 .../apache/beam/sdk/transforms/Distinct.java    |  2 +
 .../apache/beam/sdk/transforms/DoFnTester.java  | 27 +++++++----
 .../beam/sdk/transforms/FlatMapElements.java    | 14 ++++--
 .../apache/beam/sdk/transforms/MapElements.java | 14 ++++--
 .../org/apache/beam/sdk/transforms/Max.java     |  5 +-
 .../org/apache/beam/sdk/transforms/Min.java     |  8 ++--
 .../apache/beam/sdk/transforms/PTransform.java  |  7 +--
 .../org/apache/beam/sdk/transforms/Top.java     |  5 +-
 .../org/apache/beam/sdk/transforms/View.java    |  7 +--
 .../org/apache/beam/sdk/transforms/Watch.java   |  4 +-
 .../apache/beam/sdk/transforms/WithKeys.java    |  6 ++-
 .../sdk/transforms/display/DisplayData.java     | 12 ++---
 .../sdk/transforms/display/package-info.java    |  4 ++
 .../beam/sdk/transforms/join/CoGbkResult.java   |  7 ++-
 .../transforms/join/KeyedPCollectionTuple.java  | 10 ++--
 .../beam/sdk/transforms/join/RawUnionValue.java |  8 ++--
 .../beam/sdk/transforms/join/package-info.java  |  4 ++
 .../beam/sdk/transforms/package-info.java       |  4 ++
 .../reflect/ByteBuddyDoFnInvokerFactory.java    |  3 +-
 .../sdk/transforms/reflect/DoFnInvoker.java     | 50 ++++++++++++++++----
 .../sdk/transforms/reflect/DoFnSignatures.java  |  1 +
 .../sdk/transforms/reflect/package-info.java    |  3 ++
 .../splittabledofn/OffsetRangeTracker.java      |  5 +-
 .../transforms/splittabledofn/package-info.java |  4 ++
 .../windowing/IncompatibleWindowException.java  |  2 +-
 .../MergeOverlappingIntervalWindows.java        |  4 +-
 .../beam/sdk/transforms/windowing/Trigger.java  | 20 ++++----
 .../beam/sdk/transforms/windowing/Window.java   |  1 +
 .../sdk/transforms/windowing/package-info.java  |  4 ++
 .../transforms/reflect/DoFnInvokersTest.java    | 12 ++++-
 sdks/java/extensions/sql/pom.xml                |  5 ++
 .../extensions/sql/example/BeamSqlExample.java  | 45 ++++++++++--------
 .../impl/transform/BeamBuiltinAggregations.java | 10 +++-
 .../sdk/nexmark/queries/NexmarkQueryModel.java  |  5 +-
 .../apache/beam/sdk/nexmark/queries/Query8.java |  3 +-
 .../beam/sdk/nexmark/queries/WinningBids.java   |  3 +-
 41 files changed, 244 insertions(+), 107 deletions(-)
----------------------------------------------------------------------