You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/10 01:15:21 UTC

[10/11] beam git commit: NonNull by default in runners/core

NonNull by default in runners/core


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/87b07d83
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/87b07d83
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/87b07d83

Branch: refs/heads/master
Commit: 87b07d83171f8718b556fa15ec27ce824a108505
Parents: da2a606
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Oct 29 21:09:08 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Thu Nov 9 15:05:50 2017 -0800

----------------------------------------------------------------------
 .../runners/apex/translation/utils/NoOpStepContext.java |  4 ++--
 .../beam/runners/core/InMemoryStateInternals.java       | 10 +++++-----
 ...utAndTimeBoundedSplittableProcessElementInvoker.java |  8 ++++----
 .../org/apache/beam/runners/core/PeekingReiterator.java |  3 ++-
 .../org/apache/beam/runners/core/SimpleDoFnRunner.java  |  2 +-
 .../runners/core/SimplePushbackSideInputDoFnRunner.java |  4 +++-
 .../runners/core/SplittableParDoViaKeyedWorkItems.java  | 12 +++++++-----
 .../runners/core/SplittableProcessElementInvoker.java   |  6 +++---
 .../java/org/apache/beam/runners/core/package-info.java |  4 ++++
 .../translation/functions/FlinkNoOpStepContext.java     |  4 ++--
 .../streaming/state/FlinkBroadcastStateInternals.java   |  2 ++
 .../streaming/state/FlinkSplitStateInternals.java       |  2 ++
 .../runners/spark/translation/SparkProcessContext.java  |  4 ++--
 13 files changed, 39 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
index b49e4da..9268393 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -30,11 +30,11 @@ public class NoOpStepContext implements StepContext, Serializable {
 
   @Override
   public StateInternals stateInternals() {
-    return null;
+    throw new UnsupportedOperationException("stateInternals is not supported");
   }
 
   @Override
   public TimerInternals timerInternals() {
-    return null;
+    throw new UnsupportedOperationException("timerInternals is not supported");
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index 075e264..9193a74 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -59,18 +59,18 @@ import org.joda.time.Instant;
 @Experimental(Kind.STATE)
 public class InMemoryStateInternals<K> implements StateInternals {
 
-  public static <K> InMemoryStateInternals<K> forKey(K key) {
+  public static <K> InMemoryStateInternals<K> forKey(@Nullable K key) {
     return new InMemoryStateInternals<>(key);
   }
 
-  private final K key;
+  private final @Nullable K key;
 
-  protected InMemoryStateInternals(K key) {
+  protected InMemoryStateInternals(@Nullable K key) {
     this.key = key;
   }
 
   @Override
-  public K getKey() {
+  public @Nullable K getKey() {
     return key;
   }
 
@@ -179,7 +179,7 @@ public class InMemoryStateInternals<K> implements StateInternals {
   public static final class InMemoryValue<T>
       implements ValueState<T>, InMemoryState<InMemoryValue<T>> {
     private boolean isCleared = true;
-    private T value = null;
+    private @Nullable T value = null;
 
     @Override
     public void clear() {

http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index d830db5..c43ca47 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -161,7 +161,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
     // Currently we can't verify this because there are no hooks into tryClaim().
     // See https://issues.apache.org/jira/browse/BEAM-2607
     processContext.cancelScheduledCheckpoint();
-    KV<RestrictionT, Instant> residual = processContext.getTakenCheckpoint();
+    @Nullable KV<RestrictionT, Instant> residual = processContext.getTakenCheckpoint();
     if (cont.shouldResume()) {
       if (residual == null) {
         // No checkpoint had been taken by the runner while the ProcessElement call ran, however
@@ -207,13 +207,13 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
     // even if these events happen almost at the same time.
     // This is either the result of the sole tracker.checkpoint() call, or null if
     // the call completed before reaching the given number of outputs or duration.
-    private RestrictionT checkpoint;
+    private @Nullable RestrictionT checkpoint;
     // Watermark captured at the moment before checkpoint was taken, describing a lower bound
     // on the output from "checkpoint".
-    private Instant residualWatermark;
+    private @Nullable Instant residualWatermark;
     // A handle on the scheduled action to take a checkpoint.
     private Future<?> scheduledCheckpoint;
-    private Instant lastReportedWatermark;
+    private @Nullable Instant lastReportedWatermark;
 
     public ProcessContext(WindowedValue<InputT> element, TrackerT tracker) {
       fn.super();

http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java
index fcdff3b..123a8d5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.util.common.Reiterator;
 
 /**
@@ -29,7 +30,7 @@ import org.apache.beam.sdk.util.common.Reiterator;
  * @param <T> the type of elements returned by this iterator
  */
 public final class PeekingReiterator<T> implements Reiterator<T> {
-  private T nextElement;
+  private @Nullable T nextElement;
   private boolean nextElementComputed;
   private final Reiterator<T> iterator;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index c3bfef6..6ae6754 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -546,7 +546,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     private final TimeDomain timeDomain;
 
     /** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
-    private StateNamespace namespace;
+    private @Nullable StateNamespace namespace;
 
     /**
      * The state namespace for this context.

http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
index 3f77f7d..591a6a2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -39,7 +40,8 @@ public class SimplePushbackSideInputDoFnRunner<InputT, OutputT>
   private final Collection<PCollectionView<?>> views;
   private final ReadyCheckingSideInputReader sideInputReader;
 
-  private Set<BoundedWindow> notReadyWindows;
+  // Initialized in startBundle()
+  private @Nullable Set<BoundedWindow> notReadyWindows;
 
   public static <InputT, OutputT> SimplePushbackSideInputDoFnRunner<InputT, OutputT> create(
       DoFnRunner<InputT, OutputT> underlying,

http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 400df19..6c7f671 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
@@ -244,12 +245,13 @@ public class SplittableParDoViaKeyedWorkItems {
     private final Coder<RestrictionT> restrictionCoder;
     private final WindowingStrategy<InputT, ?> inputWindowingStrategy;
 
-    private transient StateInternalsFactory<String> stateInternalsFactory;
-    private transient TimerInternalsFactory<String> timerInternalsFactory;
-    private transient SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT>
+    private transient @Nullable StateInternalsFactory<String> stateInternalsFactory;
+    private transient @Nullable TimerInternalsFactory<String> timerInternalsFactory;
+    private transient @Nullable SplittableProcessElementInvoker<
+            InputT, OutputT, RestrictionT, TrackerT>
         processElementInvoker;
 
-    private transient DoFnInvoker<InputT, OutputT> invoker;
+    private transient @Nullable DoFnInvoker<InputT, OutputT> invoker;
 
     public ProcessFn(
         DoFn<InputT, OutputT> fn,
@@ -376,7 +378,7 @@ public class SplittableParDoViaKeyedWorkItems {
         return;
       }
       restrictionState.write(result.getResidualRestriction());
-      Instant futureOutputWatermark = result.getFutureOutputWatermark();
+      @Nullable Instant futureOutputWatermark = result.getFutureOutputWatermark();
       if (futureOutputWatermark == null) {
         futureOutputWatermark = elementAndRestriction.getKey().getTimestamp();
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
index 7732df3..5b9cbf2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
@@ -37,12 +37,12 @@ public abstract class SplittableProcessElementInvoker<
     @Nullable
     private final RestrictionT residualRestriction;
     private final DoFn.ProcessContinuation continuation;
-    private final Instant futureOutputWatermark;
+    private final @Nullable Instant futureOutputWatermark;
 
     public Result(
         @Nullable RestrictionT residualRestriction,
         DoFn.ProcessContinuation continuation,
-        Instant futureOutputWatermark) {
+        @Nullable Instant futureOutputWatermark) {
       this.continuation = checkNotNull(continuation);
       if (continuation.shouldResume()) {
         checkNotNull(residualRestriction);
@@ -65,7 +65,7 @@ public abstract class SplittableProcessElementInvoker<
       return continuation;
     }
 
-    public Instant getFutureOutputWatermark() {
+    public @Nullable Instant getFutureOutputWatermark() {
       return futureOutputWatermark;
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/core-java/src/main/java/org/apache/beam/runners/core/package-info.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/package-info.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/package-info.java
index d250a6a..47e96dc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/package-info.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/package-info.java
@@ -19,4 +19,8 @@
 /**
  * Provides utilities for Beam runner authors.
  */
+@DefaultAnnotation(NonNull.class)
 package org.apache.beam.runners.core;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;

http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
index 9c7b636..fd45f32 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
@@ -28,12 +28,12 @@ public class FlinkNoOpStepContext implements StepContext {
 
   @Override
   public StateInternals stateInternals() {
-    return null;
+    throw new UnsupportedOperationException("stateInternals is not supported");
   }
 
   @Override
   public TimerInternals timerInternals() {
-    return null;
+    throw new UnsupportedOperationException("timerInternals is not supported");
   }
 }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index 6cc2429..a6da211 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateTag;
@@ -77,6 +78,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals {
   }
 
   @Override
+  @Nullable
   public K getKey() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index 09e59fd..48b9613 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
 
 import com.google.common.collect.Iterators;
 import java.util.Collections;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateTag;
@@ -61,6 +62,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals {
   }
 
   @Override
+  @Nullable
   public K getKey() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 729eb1c..8b85155 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -106,12 +106,12 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
 
     @Override
     public StateInternals stateInternals() {
-      return null;
+      throw new UnsupportedOperationException("stateInternals not supported");
     }
 
     @Override
     public TimerInternals timerInternals() {
-      return null;
+      throw new UnsupportedOperationException("timerInternals not supported");
     }
   }