You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/28 16:18:33 UTC
[1/7] beam git commit: NonNull by default in sdk/transforms/windowing
Repository: beam
Updated Branches:
refs/heads/master e686286f1 -> 5fb30ec82
NonNull by default in sdk/transforms/windowing
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8353b70
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8353b70
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8353b70
Branch: refs/heads/master
Commit: d8353b70143be4eb31298c120c9faf4a372beb37
Parents: 90be820
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 23 19:17:00 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Oct 28 08:42:29 2017 -0700
----------------------------------------------------------------------
.../windowing/IncompatibleWindowException.java | 2 +-
.../MergeOverlappingIntervalWindows.java | 4 +++-
.../beam/sdk/transforms/windowing/Trigger.java | 20 +++++++++++---------
.../beam/sdk/transforms/windowing/Window.java | 1 +
.../sdk/transforms/windowing/package-info.java | 4 ++++
5 files changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d8353b70/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java
index b7b96ad..20746af 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java
@@ -32,7 +32,7 @@ public class IncompatibleWindowException extends Exception {
@Override
public String getMessage() {
- String windowFn = givenWindowFn == null ? "null" : givenWindowFn.getClass().getSimpleName();
+ String windowFn = givenWindowFn.getClass().getSimpleName();
return String.format("The given WindowFn is %s. %s", windowFn, reason);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8353b70/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java
index 0a68021..0421868 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
/**
@@ -61,9 +62,10 @@ public class MergeOverlappingIntervalWindows {
}
private static class MergeCandidate {
- private IntervalWindow union;
+ @Nullable private IntervalWindow union;
private final List<IntervalWindow> parts;
public MergeCandidate() {
+ union = null;
parts = new ArrayList<>();
}
public MergeCandidate(IntervalWindow window) {
http://git-wip-us.apache.org/repos/asf/beam/blob/d8353b70/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
index 519ab67..6985565 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -71,9 +72,9 @@ import org.joda.time.Instant;
@Experimental(Experimental.Kind.TRIGGER)
public abstract class Trigger implements Serializable {
- protected final List<Trigger> subTriggers;
+ @Nullable protected final List<Trigger> subTriggers;
- protected Trigger(List<Trigger> subTriggers) {
+ protected Trigger(@Nullable List<Trigger> subTriggers) {
this.subTriggers = subTriggers;
}
@@ -107,15 +108,16 @@ public abstract class Trigger implements Serializable {
}
/**
- * Subclasses should override this to return the {@link #getContinuationTrigger} of this
- * {@link Trigger}. For convenience, this is provided the continuation trigger of each of the
+ * Subclasses should override this to return the {@link #getContinuationTrigger} of this {@link
+ * Trigger}. For convenience, this is provided the continuation trigger of each of the
* sub-triggers in the same order as {@link #subTriggers}.
*
- * @param continuationTriggers null if {@link #subTriggers} is null, otherwise contains the
- * result of {@link #getContinuationTrigger()} on each of the
- * subTriggers in the same order.
+ * @param continuationTriggers {@code null} if {@link #subTriggers} is {@code null}, otherwise
+ * contains the result of {@link #getContinuationTrigger()} on each of the subTriggers in the
+ * same order.
*/
- protected abstract Trigger getContinuationTrigger(List<Trigger> continuationTriggers);
+ @Nullable
+ protected abstract Trigger getContinuationTrigger(@Nullable List<Trigger> continuationTriggers);
/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
@@ -224,7 +226,7 @@ public abstract class Trigger implements Serializable {
*/
@Internal
public abstract static class OnceTrigger extends Trigger {
- protected OnceTrigger(List<Trigger> subTriggers) {
+ protected OnceTrigger(@Nullable List<Trigger> subTriggers) {
super(subTriggers);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8353b70/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 2337798..3ec8136 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -486,6 +486,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
original.populateDisplayData(builder);
}
+ @Nullable
public WindowFn<T, ?> getWindowFn() {
return updatedStrategy.getWindowFn();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8353b70/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/package-info.java
index 406e279..332a7b0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/package-info.java
@@ -47,4 +47,8 @@
* {@link org.apache.beam.sdk.transforms.windowing.AfterWatermark} for details on the
* watermark.
*/
+@DefaultAnnotation(NonNull.class)
package org.apache.beam.sdk.transforms.windowing;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
[5/7] beam git commit: NonNull by default in sdk/transforms/reflect
Posted by ke...@apache.org.
NonNull by default in sdk/transforms/reflect
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f9e73dbb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f9e73dbb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f9e73dbb
Branch: refs/heads/master
Commit: f9e73dbb36f6632faea4c4bc445c47e2146abfba
Parents: d8353b7
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 23 19:23:41 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Oct 28 08:42:30 2017 -0700
----------------------------------------------------------------------
.../reflect/ByteBuddyDoFnInvokerFactory.java | 3 +-
.../sdk/transforms/reflect/DoFnInvoker.java | 50 ++++++++++++++++----
.../sdk/transforms/reflect/DoFnSignatures.java | 1 +
.../sdk/transforms/reflect/package-info.java | 3 ++
.../transforms/reflect/DoFnInvokersTest.java | 12 ++++-
5 files changed, 57 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e73dbb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index cf96c9b..8ce3348 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -394,7 +394,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
/** Whether the target method returns non-void. */
private final boolean targetHasReturn;
- protected FieldDescription delegateField;
+ /** Starts {@code null}, initialized by {@link #prepare(InstrumentedType)}. */
+ @Nullable protected FieldDescription delegateField;
private final TypeDescription doFnType;
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e73dbb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 8b41fee..ec2bf34 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -133,51 +133,81 @@ public interface DoFnInvoker<InputT, OutputT> {
Timer timer(String timerId);
}
- /** For testing only, this {@link ArgumentProvider} returns {@code null} for all parameters. */
+ /**
+ * For testing only, this {@link ArgumentProvider} throws {@link UnsupportedOperationException}
+ * for all parameters.
+ */
class FakeArgumentProvider<InputT, OutputT> implements ArgumentProvider<InputT, OutputT> {
@Override
public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
- return null;
+ throw new UnsupportedOperationException(
+ String.format(
+ "Should never call non-overridden methods of %s",
+ FakeArgumentProvider.class.getSimpleName()));
}
@Override
public BoundedWindow window() {
- return null;
+ throw new UnsupportedOperationException(
+ String.format(
+ "Should never call non-overridden methods of %s",
+ FakeArgumentProvider.class.getSimpleName()));
}
@Override
public PipelineOptions pipelineOptions() {
- return null;
+ throw new UnsupportedOperationException(
+ String.format(
+ "Should never call non-overridden methods of %s",
+ FakeArgumentProvider.class.getSimpleName()));
}
@Override
public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
- return null;
+ throw new UnsupportedOperationException(
+ String.format(
+ "Should never call non-overridden methods of %s",
+ FakeArgumentProvider.class.getSimpleName()));
}
@Override
public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
DoFn<InputT, OutputT> doFn) {
- return null;
+ throw new UnsupportedOperationException(
+ String.format(
+ "Should never call non-overridden methods of %s",
+ FakeArgumentProvider.class.getSimpleName()));
}
@Override
public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
- return null;
+ throw new UnsupportedOperationException(
+ String.format(
+ "Should never call non-overridden methods of %s",
+ FakeArgumentProvider.class.getSimpleName()));
}
@Override
public State state(String stateId) {
- return null;
+ throw new UnsupportedOperationException(
+ String.format(
+ "Should never call non-overridden methods of %s",
+ FakeArgumentProvider.class.getSimpleName()));
}
@Override
public Timer timer(String timerId) {
- return null;
+ throw new UnsupportedOperationException(
+ String.format(
+ "Should never call non-overridden methods of %s",
+ FakeArgumentProvider.class.getSimpleName()));
}
public RestrictionTracker<?> restrictionTracker() {
- return null;
+ throw new UnsupportedOperationException(
+ String.format(
+ "Should never call non-overridden methods of %s",
+ FakeArgumentProvider.class.getSimpleName()));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e73dbb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index de57c3b..c54c44f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -1296,6 +1296,7 @@ public class DoFnSignatures {
return ImmutableMap.copyOf(declarations);
}
+ @Nullable
private static Method findAnnotatedMethod(
ErrorReporter errors, Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
Collection<Method> matches = declaredMethodsWithAnnotation(anno, fnClazz, DoFn.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e73dbb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
index fe2f6b1..48b128c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
@@ -22,5 +22,8 @@
* and creating {@link org.apache.beam.sdk.transforms.reflect.DoFnSignature}'s and
* {@link org.apache.beam.sdk.transforms.reflect.DoFnInvoker}'s from them.
*/
+@DefaultAnnotation(NonNull.class)
package org.apache.beam.sdk.transforms.reflect;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
http://git-wip-us.apache.org/repos/asf/beam/blob/f9e73dbb/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 2098c66..72883ff 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -655,7 +655,17 @@ public class DoFnInvokersTest {
return null;
}
})
- .invokeProcessElement(new FakeArgumentProvider<Integer, Integer>());
+ .invokeProcessElement(new FakeArgumentProvider<Integer, Integer>() {
+ @Override
+ public DoFn.ProcessContext processContext(DoFn<Integer, Integer> doFn) {
+ return null; // will not be touched
+ }
+
+ @Override
+ public RestrictionTracker<?> restrictionTracker() {
+ return null; // will not be touched
+ }
+ });
}
@Test
[6/7] beam git commit: NonNull by default in
sdk/transforms/splittabledofn
Posted by ke...@apache.org.
NonNull by default in sdk/transforms/splittabledofn
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82fc7208
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82fc7208
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82fc7208
Branch: refs/heads/master
Commit: 82fc720808f75e84f8502f681b25560735dfa7f1
Parents: 7b2edb1
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 26 06:57:55 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Oct 28 08:42:31 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java | 5 +++--
.../apache/beam/sdk/transforms/splittabledofn/package-info.java | 4 ++++
2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/82fc7208/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
index 4987409..8ec2c6b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.MoreObjects;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
@@ -31,8 +32,8 @@ import org.apache.beam.sdk.transforms.DoFn;
*/
public class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
private OffsetRange range;
- private Long lastClaimedOffset = null;
- private Long lastAttemptedOffset = null;
+ @Nullable private Long lastClaimedOffset = null;
+ @Nullable private Long lastAttemptedOffset = null;
public OffsetRangeTracker(OffsetRange range) {
this.range = checkNotNull(range);
http://git-wip-us.apache.org/repos/asf/beam/blob/82fc7208/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java
index 4523032..82538ea 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java
@@ -19,4 +19,8 @@
* Defines utilities related to <a href="https://s.apache.org/splittable-do-fn">splittable</a>
* {@link org.apache.beam.sdk.transforms.DoFn}.
*/
+@DefaultAnnotation(NonNull.class)
package org.apache.beam.sdk.transforms.splittabledofn;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
[2/7] beam git commit: NonNull by default in sdk/transforms
Posted by ke...@apache.org.
NonNull by default in sdk/transforms
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/90be8209
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/90be8209
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/90be8209
Branch: refs/heads/master
Commit: 90be8209bb7321645e674eef63f7e5f02ee93c31
Parents: e686286
Author: Kenneth Knowles <kl...@google.com>
Authored: Sat Oct 21 19:42:33 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Oct 28 08:42:29 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/testing/PAssert.java | 5 +++
.../beam/sdk/transforms/ApproximateUnique.java | 4 +-
.../org/apache/beam/sdk/transforms/Combine.java | 10 +++--
.../apache/beam/sdk/transforms/CombineFns.java | 2 +
.../org/apache/beam/sdk/transforms/Create.java | 2 +-
.../apache/beam/sdk/transforms/Distinct.java | 2 +
.../apache/beam/sdk/transforms/DoFnTester.java | 27 ++++++++----
.../beam/sdk/transforms/FlatMapElements.java | 14 ++++--
.../apache/beam/sdk/transforms/MapElements.java | 14 ++++--
.../org/apache/beam/sdk/transforms/Max.java | 5 ++-
.../org/apache/beam/sdk/transforms/Min.java | 8 ++--
.../apache/beam/sdk/transforms/PTransform.java | 7 +--
.../org/apache/beam/sdk/transforms/Top.java | 5 ++-
.../org/apache/beam/sdk/transforms/View.java | 7 +--
.../org/apache/beam/sdk/transforms/Watch.java | 4 +-
.../apache/beam/sdk/transforms/WithKeys.java | 6 ++-
.../beam/sdk/transforms/package-info.java | 4 ++
sdks/java/extensions/sql/pom.xml | 5 +++
.../extensions/sql/example/BeamSqlExample.java | 45 +++++++++++---------
.../impl/transform/BeamBuiltinAggregations.java | 10 ++++-
20 files changed, 126 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index ef45491..aed38dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -31,6 +31,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.PipelineRunner;
@@ -605,6 +606,7 @@ public class PAssert {
}
@Override
+ @Nullable
public Void apply(T actual) {
assertThat(actual, matcher);
return null;
@@ -1269,6 +1271,7 @@ public class PAssert {
}
@Override
+ @Nullable
public Void apply(T actual) {
assertThat(actual, equalTo(expected));
return null;
@@ -1287,6 +1290,7 @@ public class PAssert {
}
@Override
+ @Nullable
public Void apply(T actual) {
assertThat(actual, not(equalTo(expected)));
return null;
@@ -1316,6 +1320,7 @@ public class PAssert {
}
@Override
+ @Nullable
public Void apply(Iterable<T> actual) {
assertThat(actual, containsInAnyOrder(expected));
return null;
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
index 5d38206..98c971d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
@@ -26,7 +26,7 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
-import org.apache.avro.reflect.Nullable;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.CoderException;
@@ -455,7 +455,7 @@ public class ApproximateUnique {
}
private static void populateDisplayData(
- DisplayData.Builder builder, long sampleSize, Double maxEstimationError) {
+ DisplayData.Builder builder, long sampleSize, @Nullable Double maxEstimationError) {
builder
.add(DisplayData.item("sampleSize", sampleSize)
.withLabel("Sample Size"))
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index fab98f8..3c5b55b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -31,6 +31,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
@@ -440,6 +441,7 @@ public class Combine {
/**
* Returns the value that should be used for the combine of the empty set.
*/
+ @Nullable
public V identity() {
return null;
}
@@ -506,7 +508,7 @@ public class Combine {
* <p>Used only as a private accumulator class.
*/
public static class Holder<V> {
- private V value;
+ @Nullable private V value;
private boolean present;
private Holder() { }
private Holder(V value) {
@@ -1945,10 +1947,10 @@ public class Combine {
* the hot and cold key paths.
*/
private static class InputOrAccum<InputT, AccumT> {
- public final InputT input;
- public final AccumT accum;
+ @Nullable public final InputT input;
+ @Nullable public final AccumT accum;
- private InputOrAccum(InputT input, AccumT aggr) {
+ private InputOrAccum(@Nullable InputT input, @Nullable AccumT aggr) {
this.input = input;
this.accum = aggr;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index c619783..02cb884 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -31,6 +31,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
@@ -176,6 +177,7 @@ public class CombineFns {
* <p>It is an error to request a non-exist tuple tag from the {@link CoCombineResult}.
*/
@SuppressWarnings("unchecked")
+ @Nullable
public <V> V get(TupleTag<V> tag) {
checkArgument(
valuesMap.keySet().contains(tag), "TupleTag " + tag + " is not in the CoCombineResult");
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 972675d..7f5920c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -137,7 +137,7 @@ public class Create<T> {
* Otherwise, use {@link Create.Values#withCoder} to set the coder explicitly.
*/
@SafeVarargs
- public static <T> Values<T> of(T elem, T... elems) {
+ public static <T> Values<T> of(@Nullable T elem, @Nullable T... elems) {
// This can't be an ImmutableList, as it may accept nulls
List<T> input = new ArrayList<>(elems.length + 1);
input.add(elem);
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
index d751dbe..a0ddd14 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.transforms;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.values.KV;
@@ -115,6 +116,7 @@ public class Distinct<T> extends PTransform<PCollection<T>,
Combine.<T, Void>perKey(
new SerializableFunction<Iterable<Void>, Void>() {
@Override
+ @Nullable
public Void apply(Iterable<Void> iter) {
return null; // ignore input
}
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index b2377dd..6168710 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -30,6 +30,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -514,17 +516,17 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
private <T> List<ValueInSingleWindow<T>> getImmutableOutput(TupleTag<T> tag) {
@SuppressWarnings({"unchecked", "rawtypes"})
- List<ValueInSingleWindow<T>> elems = (List) outputs.get(tag);
+ List<ValueInSingleWindow<T>> elems = (List) getOutputs().get(tag);
return ImmutableList.copyOf(
MoreObjects.firstNonNull(elems, Collections.<ValueInSingleWindow<T>>emptyList()));
}
@SuppressWarnings({"unchecked", "rawtypes"})
public <T> List<ValueInSingleWindow<T>> getMutableOutput(TupleTag<T> tag) {
- List<ValueInSingleWindow<T>> outputList = (List) outputs.get(tag);
+ List<ValueInSingleWindow<T>> outputList = (List) getOutputs().get(tag);
if (outputList == null) {
outputList = new ArrayList<>();
- outputs.put(tag, (List) outputList);
+ getOutputs().put(tag, (List) outputList);
}
return outputList;
}
@@ -688,11 +690,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
private TupleTag<OutputT> mainOutputTag = new TupleTag<>();
/** The original DoFn under test, if started. */
- private DoFn<InputT, OutputT> fn;
- private DoFnInvoker<InputT, OutputT> fnInvoker;
+ @Nullable private DoFn<InputT, OutputT> fn;
- /** The outputs from the {@link DoFn} under test. */
- private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs;
+ @Nullable private DoFnInvoker<InputT, OutputT> fnInvoker;
+
+ /** The outputs from the {@link DoFn} under test. Access via {@link #getOutputs()}. */
+ @CheckForNull private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs;
/** The state of processing of the {@link DoFn} under test. */
private State state = State.UNINITIALIZED;
@@ -704,12 +707,14 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
param.match(
new DoFnSignature.Parameter.Cases.WithDefault<Void>() {
@Override
+ @Nullable
public Void dispatch(DoFnSignature.Parameter.ProcessContextParameter p) {
// ProcessContext parameter is obviously supported.
return null;
}
@Override
+ @Nullable
public Void dispatch(DoFnSignature.Parameter.WindowParameter p) {
// We also support the BoundedWindow parameter.
return null;
@@ -738,6 +743,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
fnInvoker = DoFnInvokers.invokerFor(fn);
fnInvoker.invokeSetup();
- outputs = new HashMap<>();
+ }
+
+ private Map getOutputs() {
+ if (outputs == null) {
+ outputs = new HashMap<>();
+ }
+ return outputs;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index 97e1dfb..193bb6e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
@@ -34,15 +35,15 @@ import org.apache.beam.sdk.values.TypeDescriptors;
*/
public class FlatMapElements<InputT, OutputT>
extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
- private final transient TypeDescriptor<InputT> inputType;
- private final transient TypeDescriptor<OutputT> outputType;
+ @Nullable private final transient TypeDescriptor<InputT> inputType;
+ @Nullable private final transient TypeDescriptor<OutputT> outputType;
@Nullable private final transient Object originalFnForDisplayData;
@Nullable private final Contextful<Fn<InputT, Iterable<OutputT>>> fn;
private FlatMapElements(
@Nullable Contextful<Fn<InputT, Iterable<OutputT>>> fn,
@Nullable Object originalFnForDisplayData,
- TypeDescriptor<InputT> inputType,
+ @Nullable TypeDescriptor<InputT> inputType,
TypeDescriptor<OutputT> outputType) {
this.fn = fn;
this.originalFnForDisplayData = originalFnForDisplayData;
@@ -146,6 +147,13 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
@Override
public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+ checkState(
+ outputType != null,
+ "%s output type descriptor was null; "
+ + "this probably means that getOutputTypeDescriptor() was called after "
+ + "serialization/deserialization, but it is only available prior to "
+ + "serialization, for constructing a pipeline and inferring coders",
+ FlatMapElements.class.getSimpleName());
return outputType;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 1d259ac..e1d6c11 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
@@ -34,15 +35,15 @@ import org.apache.beam.sdk.values.TypeDescriptors;
*/
public class MapElements<InputT, OutputT>
extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
- private final transient TypeDescriptor<InputT> inputType;
- private final transient TypeDescriptor<OutputT> outputType;
+ @Nullable private final transient TypeDescriptor<InputT> inputType;
+ @Nullable private final transient TypeDescriptor<OutputT> outputType;
@Nullable private final transient Object originalFnForDisplayData;
@Nullable private final Contextful<Fn<InputT, OutputT>> fn;
private MapElements(
@Nullable Contextful<Fn<InputT, OutputT>> fn,
@Nullable Object originalFnForDisplayData,
- TypeDescriptor<InputT> inputType,
+ @Nullable TypeDescriptor<InputT> inputType,
TypeDescriptor<OutputT> outputType) {
this.fn = fn;
this.originalFnForDisplayData = originalFnForDisplayData;
@@ -140,6 +141,13 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
@Override
public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+ checkState(
+ outputType != null,
+ "%s output type descriptor was null; "
+ + "this probably means that getOutputTypeDescriptor() was called after "
+ + "serialization/deserialization, but it is only available prior to "
+ + "serialization, for constructing a pipeline and inferring coders",
+ MapElements.class.getSimpleName());
return outputType;
}
}).withSideInputs(fn.getRequirements().getSideInputs()));
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 710fe77..384404a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms;
import java.io.Serializable;
import java.util.Comparator;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -214,11 +215,11 @@ public class Max {
private static class MaxFn<T> extends BinaryCombineFn<T> {
- private final T identity;
+ @Nullable private final T identity;
private final Comparator<? super T> comparator;
private <ComparatorT extends Comparator<? super T> & Serializable> MaxFn(
- T identity, ComparatorT comparator) {
+ @Nullable T identity, ComparatorT comparator) {
this.identity = identity;
this.comparator = comparator;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index c566fb3..65b3e6e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms;
import java.io.Serializable;
import java.util.Comparator;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -214,11 +215,11 @@ public class Min {
private static class MinFn<T> extends BinaryCombineFn<T> {
- private final T identity;
+ @Nullable private final T identity;
private final Comparator<? super T> comparator;
private <ComparatorT extends Comparator<? super T> & Serializable> MinFn(
- T identity, ComparatorT comparator) {
+ @Nullable T identity, ComparatorT comparator) {
this.identity = identity;
this.comparator = comparator;
}
@@ -236,8 +237,7 @@ public class Min {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder.add(DisplayData.item("comparer", comparator.getClass())
- .withLabel("Record Comparer"));
+ builder.add(DisplayData.item("comparer", comparator.getClass()).withLabel("Record Comparer"));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index f5e7830..139d82a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -22,6 +22,7 @@ import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
@@ -197,7 +198,7 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
*
* <p>By default, does nothing.
*/
- public void validate(PipelineOptions options) {}
+ public void validate(@Nullable PipelineOptions options) {}
/**
* Returns all {@link PValue PValues} that are consumed as inputs to this {@link PTransform} that
@@ -227,13 +228,13 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
* The base name of this {@code PTransform}, e.g., from defaults, or
* {@code null} if not yet assigned.
*/
- protected final transient String name;
+ @Nullable protected final transient String name;
protected PTransform() {
this.name = null;
}
- protected PTransform(String name) {
+ protected PTransform(@Nullable String name) {
this.name = name;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index 319f779..35d6703 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -29,6 +29,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.PriorityQueue;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
@@ -453,14 +454,14 @@ public class Top {
*
* <p>Only one of asList and asQueue may be non-null.
*/
- private PriorityQueue<T> asQueue;
+ @Nullable private PriorityQueue<T> asQueue;
/**
* A list in with largest first, the form of extractOutput().
*
* <p>Only one of asList and asQueue may be non-null.
*/
- private List<T> asList;
+ @Nullable private List<T> asList;
/** The user-provided Comparator. */
private final ComparatorT compareFn;
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index e463d46..eaa7925 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
@@ -300,7 +301,7 @@ public class View {
*/
@Internal
public static class AsSingleton<T> extends PTransform<PCollection<T>, PCollectionView<T>> {
- private final T defaultValue;
+ @Nullable private final T defaultValue;
private final boolean hasDefault;
private AsSingleton() {
@@ -353,8 +354,8 @@ public class View {
private static class SingletonCombineFn<T> extends Combine.BinaryCombineFn<T> {
private final boolean hasDefault;
- private final Coder<T> valueCoder;
- private final byte[] defaultValue;
+ @Nullable private final Coder<T> valueCoder;
+ @Nullable private final byte[] defaultValue;
private SingletonCombineFn(boolean hasDefault, Coder<T> coder, T defaultValue) {
this.hasDefault = hasDefault;
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index a3c906c..75c2fe4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -247,7 +247,7 @@ public class Watch {
* Called by the {@link Watch} transform to create a new independent termination state for a
* newly arrived {@code InputT}.
*/
- StateT forNewInput(Instant now, InputT input);
+ StateT forNewInput(Instant now, @Nullable InputT input);
/**
* Called by the {@link Watch} transform to compute a new termination state, in case after
@@ -799,7 +799,7 @@ public class Watch {
// Outputs that have been claimed in the current ProcessElement call. A prefix of "pending".
private List<TimestampedValue<OutputT>> claimed = Lists.newArrayList();
private boolean isOutputComplete;
- private TerminationStateT terminationState;
+ @Nullable private TerminationStateT terminationState;
@Nullable private Instant pollWatermark;
private boolean shouldStop = false;
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index 79cac99..23696e5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkNotNull;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
@@ -78,7 +80,7 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
* given key.
*/
@SuppressWarnings("unchecked")
- public static <K, V> WithKeys<K, V> of(final K key) {
+ public static <K, V> WithKeys<K, V> of(@Nullable final K key) {
return new WithKeys<>(
new SerializableFunction<V, K>() {
@Override
@@ -93,7 +95,7 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
/////////////////////////////////////////////////////////////////////////////
private SerializableFunction<V, K> fn;
- private transient Class<K> keyClass;
+ @CheckForNull private transient Class<K> keyClass;
private WithKeys(SerializableFunction<V, K> fn, Class<K> keyClass) {
this.fn = fn;
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/package-info.java
index 892dee9..634786b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/package-info.java
@@ -40,4 +40,8 @@
* for their own application-specific logic.
*
*/
+@DefaultAnnotation(NonNull.class)
package org.apache.beam.sdk.transforms;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/extensions/sql/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/pom.xml b/sdks/java/extensions/sql/pom.xml
index cf3b7dc..742d3b6 100644
--- a/sdks/java/extensions/sql/pom.xml
+++ b/sdks/java/extensions/sql/pom.xml
@@ -206,6 +206,11 @@
</dependency>
<dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
index 350bb7b..0154e1e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.example;
import java.sql.Types;
import java.util.Arrays;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.BeamSql;
@@ -66,16 +67,18 @@ class BeamSqlExample {
BeamSql.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));
//print the output record of case 1;
- outputStream.apply("log_result",
- MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() {
- public Void apply(BeamRecord input) {
- //expect output:
- // PCOLLECTION: [3, row, 3.0]
- // PCOLLECTION: [2, row, 2.0]
- System.out.println("PCOLLECTION: " + input.getDataValues());
- return null;
- }
- }));
+ outputStream.apply(
+ "log_result",
+ MapElements.<BeamRecord, Void>via(
+ new SimpleFunction<BeamRecord, Void>() {
+ public @Nullable Void apply(BeamRecord input) {
+ //expect output:
+ // PCOLLECTION: [3, row, 3.0]
+ // PCOLLECTION: [2, row, 2.0]
+ System.out.println("PCOLLECTION: " + input.getDataValues());
+ return null;
+ }
+ }));
//Case 2. run the query with BeamSql.query over result PCollection of case 1.
PCollection<BeamRecord> outputStream2 =
@@ -83,16 +86,18 @@ class BeamSqlExample {
.apply(BeamSql.queryMulti("select c2, sum(c3) from CASE1_RESULT group by c2"));
//print the output record of case 2;
- outputStream2.apply("log_result",
- MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() {
- @Override
- public Void apply(BeamRecord input) {
- //expect output:
- // CASE1_RESULT: [row, 5.0]
- System.out.println("CASE1_RESULT: " + input.getDataValues());
- return null;
- }
- }));
+ outputStream2.apply(
+ "log_result",
+ MapElements.<BeamRecord, Void>via(
+ new SimpleFunction<BeamRecord, Void>() {
+ @Override
+ public @Nullable Void apply(BeamRecord input) {
+ //expect output:
+ // CASE1_RESULT: [row, 5.0]
+ System.out.println("CASE1_RESULT: " + input.getDataValues());
+ return null;
+ }
+ }));
p.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/90be8209/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
index b5a5266..ad15f98 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
@@ -23,6 +23,7 @@ import java.math.MathContext;
import java.math.RoundingMode;
import java.util.Date;
import java.util.Iterator;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.BigDecimalCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
@@ -253,6 +254,7 @@ class BeamBuiltinAggregations {
}
static class IntegerAvg extends Avg<Integer>{
+ @Nullable
public Integer extractOutput(KV<Integer, BigDecimal> accumulator) {
return accumulator.getKey() == 0 ? null : prepareOutput(accumulator).intValue();
}
@@ -263,6 +265,7 @@ class BeamBuiltinAggregations {
}
static class LongAvg extends Avg<Long>{
+ @Nullable
public Long extractOutput(KV<Integer, BigDecimal> accumulator) {
return accumulator.getKey() == 0 ? null : prepareOutput(accumulator).longValue();
}
@@ -273,6 +276,7 @@ class BeamBuiltinAggregations {
}
static class ShortAvg extends Avg<Short>{
+ @Nullable
public Short extractOutput(KV<Integer, BigDecimal> accumulator) {
return accumulator.getKey() == 0 ? null : prepareOutput(accumulator).shortValue();
}
@@ -282,7 +286,8 @@ class BeamBuiltinAggregations {
}
}
- static class ByteAvg extends Avg<Byte>{
+ static class ByteAvg extends Avg<Byte> {
+ @Nullable
public Byte extractOutput(KV<Integer, BigDecimal> accumulator) {
return accumulator.getKey() == 0 ? null : prepareOutput(accumulator).byteValue();
}
@@ -293,6 +298,7 @@ class BeamBuiltinAggregations {
}
static class FloatAvg extends Avg<Float>{
+ @Nullable
public Float extractOutput(KV<Integer, BigDecimal> accumulator) {
return accumulator.getKey() == 0 ? null : prepareOutput(accumulator).floatValue();
}
@@ -303,6 +309,7 @@ class BeamBuiltinAggregations {
}
static class DoubleAvg extends Avg<Double>{
+ @Nullable
public Double extractOutput(KV<Integer, BigDecimal> accumulator) {
return accumulator.getKey() == 0 ? null : prepareOutput(accumulator).doubleValue();
}
@@ -313,6 +320,7 @@ class BeamBuiltinAggregations {
}
static class BigDecimalAvg extends Avg<BigDecimal>{
+ @Nullable
public BigDecimal extractOutput(KV<Integer, BigDecimal> accumulator) {
return accumulator.getKey() == 0 ? null : prepareOutput(accumulator);
}
[4/7] beam git commit: NonNull by default in sdk/transforms/display
Posted by ke...@apache.org.
NonNull by default in sdk/transforms/display
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e25aba8e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e25aba8e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e25aba8e
Branch: refs/heads/master
Commit: e25aba8e776ee85ecec3ebaa49d6c13240911b12
Parents: f9e73db
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 23 19:32:02 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Oct 28 08:42:30 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/display/DisplayData.java | 12 ++++++------
.../beam/sdk/transforms/display/package-info.java | 4 ++++
2 files changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e25aba8e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 917c070..1b4b48f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -710,14 +710,14 @@ public class DisplayData implements Serializable {
*/
private static final FormattedItemValue NULL_VALUES = new FormattedItemValue(null);
- private final Object shortValue;
- private final Object longValue;
+ @Nullable private final Object shortValue;
+ @Nullable private final Object longValue;
- private FormattedItemValue(Object longValue) {
+ private FormattedItemValue(@Nullable Object longValue) {
this(longValue, null);
}
- private FormattedItemValue(Object longValue, Object shortValue) {
+ private FormattedItemValue(@Nullable Object longValue, @Nullable Object shortValue) {
this.longValue = longValue;
this.shortValue = shortValue;
}
@@ -735,8 +735,8 @@ public class DisplayData implements Serializable {
private final Set<HasDisplayData> visitedComponents;
private final Map<Path, HasDisplayData> visitedPathMap;
- private Path latestPath;
- private Class<?> latestNs;
+ @Nullable private Path latestPath;
+ @Nullable private Class<?> latestNs;
private InternalBuilder() {
this.entries = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/beam/blob/e25aba8e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/package-info.java
index 4af3327..e4fff40 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/package-info.java
@@ -23,4 +23,8 @@
*
* @see org.apache.beam.sdk.transforms.display.HasDisplayData
*/
+@DefaultAnnotation(NonNull.class)
package org.apache.beam.sdk.transforms.display;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
[3/7] beam git commit: NonNull by default in sdk/transforms/join
Posted by ke...@apache.org.
NonNull by default in sdk/transforms/join
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b2edb15
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b2edb15
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b2edb15
Branch: refs/heads/master
Commit: 7b2edb153967ec5d3cfa1e2e9fe538159f953b30
Parents: e25aba8
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 23 19:35:08 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Oct 28 08:42:30 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/join/CoGbkResult.java | 7 +++++--
.../beam/sdk/transforms/join/KeyedPCollectionTuple.java | 10 ++++------
.../apache/beam/sdk/transforms/join/RawUnionValue.java | 8 +++++---
.../org/apache/beam/sdk/transforms/join/package-info.java | 4 ++++
.../beam/sdk/nexmark/queries/NexmarkQueryModel.java | 5 ++---
.../java/org/apache/beam/sdk/nexmark/queries/Query8.java | 3 ++-
.../org/apache/beam/sdk/nexmark/queries/WinningBids.java | 3 ++-
7 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 877bb07..16a0bae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
@@ -197,7 +198,8 @@ public class CoGbkResult {
* <p>If tag was not part of the original {@link CoGroupByKey},
* throws an IllegalArgumentException.
*/
- public <V> V getOnly(TupleTag<V> tag, V defaultValue) {
+ @Nullable
+ public <V> V getOnly(TupleTag<V> tag, @Nullable V defaultValue) {
return innerGetOnly(tag, defaultValue, true);
}
@@ -356,9 +358,10 @@ public class CoGbkResult {
this.valueMap = valueMap;
}
+ @Nullable
private <V> V innerGetOnly(
TupleTag<V> tag,
- V defaultValue,
+ @Nullable V defaultValue,
boolean useDefault) {
int index = schema.getIndex(tag);
if (index < 0) {
http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
index 2e7dd01..a9d1873 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -207,24 +208,21 @@ public class KeyedPCollectionTuple<K> implements PInput {
*/
private final List<TaggedKeyedPCollection<K, ?>> keyedCollections;
- private Coder<K> keyCoder;
+ @Nullable private Coder<K> keyCoder;
private final CoGbkResultSchema schema;
private final Pipeline pipeline;
KeyedPCollectionTuple(Pipeline pipeline) {
- this(pipeline,
- new ArrayList<TaggedKeyedPCollection<K, ?>>(),
- TupleTagList.empty(),
- null);
+ this(pipeline, new ArrayList<TaggedKeyedPCollection<K, ?>>(), TupleTagList.empty(), null);
}
KeyedPCollectionTuple(
Pipeline pipeline,
List<TaggedKeyedPCollection<K, ?>> keyedCollections,
TupleTagList tupleTagList,
- Coder<K> keyCoder) {
+ @Nullable Coder<K> keyCoder) {
this.pipeline = pipeline;
this.keyedCollections = keyedCollections;
this.schema = new CoGbkResultSchema(tupleTagList);
http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
index 07bfe69..7ac1faf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
@@ -20,18 +20,20 @@ package org.apache.beam.sdk.transforms.join;
// TODO: Think about making this a complete dynamic union by adding
// a schema. Type would then be defined by the corresponding schema entry.
+import javax.annotation.Nullable;
+
/**
* This corresponds to an integer union tag and value. The mapping of
* union tag to type must come from elsewhere.
*/
public class RawUnionValue {
private final int unionTag;
- private final Object value;
+ @Nullable private final Object value;
/**
* Constructs a partial union from the given union tag and value.
*/
- public RawUnionValue(int unionTag, Object value) {
+ public RawUnionValue(int unionTag, @Nullable Object value) {
this.unionTag = unionTag;
this.value = value;
}
@@ -40,7 +42,7 @@ public class RawUnionValue {
return unionTag;
}
- public Object getValue() {
+ @Nullable public Object getValue() {
return value;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java
index f4b315e..7aab329 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/package-info.java
@@ -19,4 +19,8 @@
* Defines the {@link org.apache.beam.sdk.transforms.join.CoGroupByKey} transform
* for joining multiple PCollections.
*/
+@DefaultAnnotation(NonNull.class)
package org.apache.beam.sdk.transforms.join;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
index 1f093a0..2efab3e 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
@@ -24,13 +24,11 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
-
+import javax.annotation.Nullable;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.model.KnownSize;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TimestampedValue;
-
-
import org.hamcrest.core.IsEqual;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -107,6 +105,7 @@ public abstract class NexmarkQueryModel implements Serializable {
return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() {
@Override
+ @Nullable
public Void apply(Iterable<TimestampedValue<KnownSize>> actual) {
Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
Assert.assertThat("wrong pipeline output", actualStrings,
http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
index fa3dd86..def7cb3 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.nexmark.queries;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.Auction;
@@ -78,7 +79,7 @@ public class Query8 extends NexmarkQuery {
ParDo.of(new DoFn<KV<Long, CoGbkResult>, IdNameReserve>() {
@ProcessElement
public void processElement(ProcessContext c) {
- Person person = c.element().getValue().getOnly(PERSON_TAG, null);
+ @Nullable Person person = c.element().getValue().getOnly(PERSON_TAG, null);
if (person == null) {
// Person was not created in last window period.
return;
http://git-wip-us.apache.org/repos/asf/beam/blob/7b2edb15/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
index d73b8ae..bc553c9 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
@@ -362,7 +363,7 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
@ProcessElement
public void processElement(ProcessContext c) {
- Auction auction =
+ @Nullable Auction auction =
c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null);
if (auction == null) {
// We have bids without a matching auction. Give up.
[7/7] beam git commit: This closes #4049: [BEAM-3081] NonNull by
default for sdk/transforms and below
Posted by ke...@apache.org.
This closes #4049: [BEAM-3081] NonNull by default for sdk/transforms and below
NonNull by default in sdk/transforms/splittabledofn
NonNull by default in sdk/transforms/join
NonNull by default in sdk/transforms/display
NonNull by default in sdk/transforms/reflect
NonNull by default in sdk/transforms/windowing
NonNull by default in sdk/transforms
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5fb30ec8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5fb30ec8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5fb30ec8
Branch: refs/heads/master
Commit: 5fb30ec8265c841cd8c4e6ae16b43be1f171eabb
Parents: e686286 82fc720
Author: Kenneth Knowles <kl...@google.com>
Authored: Sat Oct 28 08:43:06 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Oct 28 08:43:06 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/testing/PAssert.java | 5 ++
.../beam/sdk/transforms/ApproximateUnique.java | 4 +-
.../org/apache/beam/sdk/transforms/Combine.java | 10 ++--
.../apache/beam/sdk/transforms/CombineFns.java | 2 +
.../org/apache/beam/sdk/transforms/Create.java | 2 +-
.../apache/beam/sdk/transforms/Distinct.java | 2 +
.../apache/beam/sdk/transforms/DoFnTester.java | 27 +++++++----
.../beam/sdk/transforms/FlatMapElements.java | 14 ++++--
.../apache/beam/sdk/transforms/MapElements.java | 14 ++++--
.../org/apache/beam/sdk/transforms/Max.java | 5 +-
.../org/apache/beam/sdk/transforms/Min.java | 8 ++--
.../apache/beam/sdk/transforms/PTransform.java | 7 +--
.../org/apache/beam/sdk/transforms/Top.java | 5 +-
.../org/apache/beam/sdk/transforms/View.java | 7 +--
.../org/apache/beam/sdk/transforms/Watch.java | 4 +-
.../apache/beam/sdk/transforms/WithKeys.java | 6 ++-
.../sdk/transforms/display/DisplayData.java | 12 ++---
.../sdk/transforms/display/package-info.java | 4 ++
.../beam/sdk/transforms/join/CoGbkResult.java | 7 ++-
.../transforms/join/KeyedPCollectionTuple.java | 10 ++--
.../beam/sdk/transforms/join/RawUnionValue.java | 8 ++--
.../beam/sdk/transforms/join/package-info.java | 4 ++
.../beam/sdk/transforms/package-info.java | 4 ++
.../reflect/ByteBuddyDoFnInvokerFactory.java | 3 +-
.../sdk/transforms/reflect/DoFnInvoker.java | 50 ++++++++++++++++----
.../sdk/transforms/reflect/DoFnSignatures.java | 1 +
.../sdk/transforms/reflect/package-info.java | 3 ++
.../splittabledofn/OffsetRangeTracker.java | 5 +-
.../transforms/splittabledofn/package-info.java | 4 ++
.../windowing/IncompatibleWindowException.java | 2 +-
.../MergeOverlappingIntervalWindows.java | 4 +-
.../beam/sdk/transforms/windowing/Trigger.java | 20 ++++----
.../beam/sdk/transforms/windowing/Window.java | 1 +
.../sdk/transforms/windowing/package-info.java | 4 ++
.../transforms/reflect/DoFnInvokersTest.java | 12 ++++-
sdks/java/extensions/sql/pom.xml | 5 ++
.../extensions/sql/example/BeamSqlExample.java | 45 ++++++++++--------
.../impl/transform/BeamBuiltinAggregations.java | 10 +++-
.../sdk/nexmark/queries/NexmarkQueryModel.java | 5 +-
.../apache/beam/sdk/nexmark/queries/Query8.java | 3 +-
.../beam/sdk/nexmark/queries/WinningBids.java | 3 +-
41 files changed, 244 insertions(+), 107 deletions(-)
----------------------------------------------------------------------