You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/06 17:20:26 UTC
[20/50] [abbrv] incubator-beam git commit: Static import
Preconditions.checkX everywhere
Static import Preconditions.checkX everywhere
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8b8615ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8b8615ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8b8615ea
Branch: refs/heads/runners-spark2
Commit: 8b8615ea2b8f047fbd82f887fb11ad30989a279e
Parents: 067c92a
Author: Dan Halperin <dh...@google.com>
Authored: Sun Jun 26 02:05:08 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 6 10:18:50 2016 -0700
----------------------------------------------------------------------
.../beam/examples/complete/AutoComplete.java | 4 +-
.../complete/game/injector/InjectorUtils.java | 6 +--
.../injector/RetryHttpInitializerWrapper.java | 5 +-
.../beam/sdk/util/BatchTimerInternals.java | 7 +--
.../apache/beam/sdk/util/DoFnRunnerBase.java | 15 +++---
.../apache/beam/sdk/util/PaneInfoTracker.java | 11 +++--
.../beam/sdk/util/ReduceFnContextFactory.java | 6 +--
.../apache/beam/sdk/util/ReduceFnRunner.java | 26 +++++-----
.../org/apache/beam/sdk/util/TriggerRunner.java | 5 +-
.../org/apache/beam/sdk/util/WatermarkHold.java | 19 ++++----
.../beam/sdk/util/ReduceFnRunnerTest.java | 18 +++----
.../apache/beam/sdk/util/ReduceFnTester.java | 19 ++++----
.../beam/runners/direct/WatermarkManager.java | 5 +-
.../FlinkPipelineExecutionEnvironment.java | 6 +--
.../FlinkStreamingTranslationContext.java | 8 +--
.../functions/FlinkProcessContext.java | 25 +++++-----
.../translation/types/CoderTypeInformation.java | 6 +--
.../utils/SerializedPipelineOptions.java | 8 +--
.../streaming/FlinkAbstractParDoWrapper.java | 10 ++--
.../FlinkGroupAlsoByWindowWrapper.java | 23 ++++-----
.../streaming/FlinkParDoBoundMultiWrapper.java | 8 +--
.../streaming/io/UnboundedFlinkSink.java | 2 +-
.../streaming/io/UnboundedFlinkSource.java | 7 ++-
.../streaming/state/FlinkStateInternals.java | 7 +--
.../dataflow/DataflowPipelineTranslator.java | 15 +++---
.../beam/runners/dataflow/DataflowRunner.java | 19 ++++----
.../runners/dataflow/internal/IsmFormat.java | 8 +--
.../options/DataflowWorkerLoggingOptions.java | 14 +++---
.../dataflow/util/DataflowPathValidator.java | 14 +++---
.../beam/runners/dataflow/util/GcsStager.java | 5 +-
.../beam/runners/spark/io/CreateStream.java | 7 +--
.../apache/beam/runners/spark/io/KafkaIO.java | 23 ++++-----
.../beam/runners/spark/io/hadoop/HadoopIO.java | 38 +++++++--------
.../main/java/org/apache/beam/sdk/Pipeline.java | 7 +--
.../java/org/apache/beam/sdk/coders/Coder.java | 6 +--
.../apache/beam/sdk/coders/CoderRegistry.java | 5 +-
.../apache/beam/sdk/coders/CollectionCoder.java | 7 ++-
.../apache/beam/sdk/coders/IterableCoder.java | 7 ++-
.../beam/sdk/coders/IterableLikeCoder.java | 10 ++--
.../org/apache/beam/sdk/coders/KvCoder.java | 7 ++-
.../org/apache/beam/sdk/coders/ListCoder.java | 7 ++-
.../org/apache/beam/sdk/coders/MapCoder.java | 6 +--
.../apache/beam/sdk/coders/NullableCoder.java | 6 +--
.../org/apache/beam/sdk/coders/SetCoder.java | 7 ++-
.../java/org/apache/beam/sdk/io/AvroIO.java | 6 +--
.../java/org/apache/beam/sdk/io/AvroSource.java | 12 ++---
.../apache/beam/sdk/io/CompressedSource.java | 10 ++--
.../org/apache/beam/sdk/io/DatastoreIO.java | 7 ++-
.../org/apache/beam/sdk/io/FileBasedSink.java | 13 +++--
.../apache/beam/sdk/io/OffsetBasedSource.java | 12 ++---
.../java/org/apache/beam/sdk/io/TextIO.java | 6 +--
.../java/org/apache/beam/sdk/io/XmlSink.java | 10 ++--
.../java/org/apache/beam/sdk/io/XmlSource.java | 11 +++--
.../sdk/options/PipelineOptionsFactory.java | 28 +++++------
.../sdk/options/PipelineOptionsValidator.java | 18 ++++---
.../sdk/options/ProxyInvocationHandler.java | 9 ++--
.../apache/beam/sdk/runners/PipelineRunner.java | 6 +--
.../beam/sdk/runners/TransformHierarchy.java | 10 ++--
.../beam/sdk/runners/TransformTreeNode.java | 13 ++---
.../sdk/transforms/ApproximateQuantiles.java | 9 ++--
.../org/apache/beam/sdk/transforms/Combine.java | 8 +--
.../org/apache/beam/sdk/transforms/Create.java | 5 +-
.../transforms/IntraBundleParallelization.java | 9 ++--
.../org/apache/beam/sdk/transforms/Sample.java | 6 +--
.../org/apache/beam/sdk/transforms/Top.java | 7 ++-
.../sdk/transforms/display/DisplayData.java | 4 +-
.../beam/sdk/transforms/join/CoGbkResult.java | 6 +--
.../beam/sdk/transforms/windowing/AfterAll.java | 6 +--
.../sdk/transforms/windowing/AfterFirst.java | 9 ++--
.../beam/sdk/transforms/windowing/PaneInfo.java | 11 +++--
...AttemptAndTimeBoundedExponentialBackOff.java | 11 +++--
.../util/AttemptBoundedExponentialBackOff.java | 9 ++--
.../beam/sdk/util/BigQueryTableInserter.java | 5 +-
.../org/apache/beam/sdk/util/Credentials.java | 5 +-
.../apache/beam/sdk/util/ExecutableTrigger.java | 11 +++--
.../java/org/apache/beam/sdk/util/GcsUtil.java | 10 ++--
.../apache/beam/sdk/util/InstanceBuilder.java | 21 ++++----
.../util/IntervalBoundedExponentialBackOff.java | 10 ++--
.../beam/sdk/util/MergingActiveWindowSet.java | 35 +++++++-------
.../apache/beam/sdk/util/SerializableUtils.java | 13 +++--
.../org/apache/beam/sdk/util/StringUtils.java | 12 +++--
.../apache/beam/sdk/util/TimerInternals.java | 5 +-
.../apache/beam/sdk/util/ValueWithRecordId.java | 6 +--
.../org/apache/beam/sdk/util/WindowedValue.java | 5 +-
.../beam/sdk/util/common/ReflectHelpers.java | 9 ++--
.../org/apache/beam/sdk/util/gcsfs/GcsPath.java | 51 ++++++++++----------
.../beam/sdk/util/state/StateMerging.java | 6 +--
.../apache/beam/sdk/transforms/CombineTest.java | 17 ++++---
.../apache/beam/sdk/transforms/ParDoTest.java | 7 ++-
.../apache/beam/sdk/transforms/SampleTest.java | 6 +--
.../apache/beam/sdk/transforms/ViewTest.java | 10 ++--
.../org/apache/beam/sdk/util/TriggerTester.java | 3 +-
.../beam/sdk/extensions/joinlibrary/Join.java | 19 ++++----
.../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 23 ++++-----
94 files changed, 525 insertions(+), 509 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index 2732aa5..f278ce3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.examples.complete;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
import org.apache.beam.examples.common.DataflowExampleUtils;
@@ -58,7 +59,6 @@ import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
import com.google.datastore.v1beta3.Entity;
import com.google.datastore.v1beta3.Key;
import com.google.datastore.v1beta3.Value;
@@ -460,7 +460,7 @@ public class AutoComplete {
PTransform<? super PBegin, PCollection<String>> readSource;
WindowFn<Object, ?> windowFn;
if (options.isStreaming()) {
- Preconditions.checkArgument(
+ checkArgument(
!options.getOutputToDatastore(), "DatastoreIO is not supported in streaming.");
dataflowUtils.setupPubsub();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/InjectorUtils.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/InjectorUtils.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/InjectorUtils.java
index dbb5b8f..53e644d 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/InjectorUtils.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/InjectorUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.examples.complete.game.injector;
+import static com.google.common.base.Preconditions.checkNotNull;
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
@@ -28,7 +29,6 @@ import com.google.api.client.json.JsonFactory;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.PubsubScopes;
import com.google.api.services.pubsub.model.Topic;
-import com.google.common.base.Preconditions;
import java.io.IOException;
@@ -42,8 +42,8 @@ class InjectorUtils {
public static Pubsub getClient(final HttpTransport httpTransport,
final JsonFactory jsonFactory)
throws IOException {
- Preconditions.checkNotNull(httpTransport);
- Preconditions.checkNotNull(jsonFactory);
+ checkNotNull(httpTransport);
+ checkNotNull(jsonFactory);
GoogleCredential credential =
GoogleCredential.getApplicationDefault(httpTransport, jsonFactory);
if (credential.createScopedRequired()) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/RetryHttpInitializerWrapper.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/RetryHttpInitializerWrapper.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/RetryHttpInitializerWrapper.java
index 3183a05..45be287 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/RetryHttpInitializerWrapper.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/RetryHttpInitializerWrapper.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.examples.complete.game.injector;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.http.HttpBackOffIOExceptionHandler;
import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
@@ -26,7 +28,6 @@ import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
-import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.logging.Logger;
@@ -81,7 +82,7 @@ public class RetryHttpInitializerWrapper implements HttpRequestInitializer {
*/
RetryHttpInitializerWrapper(
final Credential wrappedCredential, final Sleeper sleeper) {
- this.wrappedCredential = Preconditions.checkNotNull(wrappedCredential);
+ this.wrappedCredential = checkNotNull(wrappedCredential);
this.sleeper = sleeper;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java
index d0c0b2f..0dd03ba 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java
@@ -17,10 +17,11 @@
*/
package org.apache.beam.sdk.util;
+import static com.google.common.base.Preconditions.checkState;
+
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
import org.joda.time.Instant;
@@ -105,7 +106,7 @@ public class BatchTimerInternals implements TimerInternals {
public void advanceInputWatermark(ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark)
throws Exception {
- Preconditions.checkState(!newInputWatermark.isBefore(inputWatermarkTime),
+ checkState(!newInputWatermark.isBefore(inputWatermarkTime),
"Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
newInputWatermark);
inputWatermarkTime = newInputWatermark;
@@ -114,7 +115,7 @@ public class BatchTimerInternals implements TimerInternals {
public void advanceProcessingTime(ReduceFnRunner<?, ?, ?, ?> runner, Instant newProcessingTime)
throws Exception {
- Preconditions.checkState(!newProcessingTime.isBefore(processingTime),
+ checkState(!newProcessingTime.isBefore(processingTime),
"Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime);
processingTime = newProcessingTime;
advance(runner, newProcessingTime, TimeDomain.PROCESSING_TIME);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
index a849eb2..3f8f8e0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.util;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -36,7 +38,6 @@ import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -333,13 +334,13 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
@Override
public <T> void sideOutput(TupleTag<T> tag, T output) {
- Preconditions.checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
+ checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
}
@Override
public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- Preconditions.checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
+ checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
}
@@ -351,7 +352,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
@Override
protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- Preconditions.checkNotNull(combiner,
+ checkNotNull(combiner,
"Combiner passed to createAggregator cannot be null");
return new CounterAggregator<>(generateInternalAggregatorName(name),
combiner, addCounterMutator);
@@ -409,7 +410,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
@Override
public <T> T sideInput(PCollectionView<T> view) {
- Preconditions.checkNotNull(view, "View passed to sideInput cannot be null");
+ checkNotNull(view, "View passed to sideInput cannot be null");
Iterator<? extends BoundedWindow> windowIter = windows().iterator();
BoundedWindow window;
if (!windowIter.hasNext()) {
@@ -467,13 +468,13 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
@Override
public <T> void sideOutput(TupleTag<T> tag, T output) {
- Preconditions.checkNotNull(tag, "Tag passed to sideOutput cannot be null");
+ checkNotNull(tag, "Tag passed to sideOutput cannot be null");
context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
}
@Override
public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- Preconditions.checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
+ checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
checkTimestamp(timestamp);
context.sideOutputWindowedValue(
tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
index 5e08031..0a47feb 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.util;
+import static com.google.common.base.Preconditions.checkState;
+
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
@@ -28,7 +30,6 @@ import org.apache.beam.sdk.util.state.StateTags;
import org.apache.beam.sdk.util.state.ValueState;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import org.joda.time.Instant;
@@ -132,21 +133,21 @@ public class PaneInfoTracker {
// Timing transitions should follow EARLY* ON_TIME? LATE*
switch (previousTiming) {
case EARLY:
- Preconditions.checkState(
+ checkState(
timing == Timing.EARLY || timing == Timing.ON_TIME || timing == Timing.LATE,
"EARLY cannot transition to %s", timing);
break;
case ON_TIME:
- Preconditions.checkState(
+ checkState(
timing == Timing.LATE, "ON_TIME cannot transition to %s", timing);
break;
case LATE:
- Preconditions.checkState(timing == Timing.LATE, "LATE cannot transtion to %s", timing);
+ checkState(timing == Timing.LATE, "LATE cannot transtion to %s", timing);
break;
case UNKNOWN:
break;
}
- Preconditions.checkState(!previousPane.isLast(), "Last pane was not last after all.");
+ checkState(!previousPane.isLast(), "Last pane was not last after all.");
}
return PaneInfo.createPane(isFirst, isFinal, timing, index, nonSpeculativeIndex);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java
index c90940e..2d86508 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.util;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.beam.sdk.coders.Coder;
@@ -36,7 +37,6 @@ import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
import org.apache.beam.sdk.util.state.StateTag;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;
@@ -123,7 +123,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
private final StateNamespace namespace;
public TimersImpl(StateNamespace namespace) {
- Preconditions.checkArgument(namespace instanceof WindowNamespace);
+ checkArgument(namespace instanceof WindowNamespace);
this.namespace = namespace;
}
@@ -248,7 +248,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
namespace = namespaceFor(activeWindows.writeStateAddress(mergingWindow));
break;
}
- Preconditions.checkNotNull(namespace); // cases are exhaustive.
+ checkNotNull(namespace); // cases are exhaustive.
builder.put(mergingWindow, stateInternals.state(namespace, address, context));
}
return builder.build();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
index 2efc859..c879409 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.util;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
@@ -40,7 +43,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
@@ -527,7 +529,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
// Assert that holds have a proximate timer.
boolean holdInWindow = !hold.isAfter(window.maxTimestamp());
boolean timerInWindow = !timer.isAfter(window.maxTimestamp());
- Preconditions.checkState(
+ checkState(
holdInWindow == timerInWindow,
"set a hold at %s, a timer at %s, which disagree as to whether they are in window %s",
hold,
@@ -559,7 +561,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
*/
public void onTimer(TimerData timer) throws Exception {
// Which window is the timer for?
- Preconditions.checkArgument(timer.getNamespace() instanceof WindowNamespace,
+ checkArgument(timer.getNamespace() instanceof WindowNamespace,
"Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
@SuppressWarnings("unchecked")
WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
@@ -607,7 +609,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
// and the watermark has passed the end of the window.
@Nullable Instant newHold =
onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow);
- Preconditions.checkState(newHold == null,
+ checkState(newHold == null,
"Hold placed at %s despite isFinished being true.", newHold);
}
@@ -635,7 +637,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
// cleanup event and handled by the above).
// Note we must do this even if the trigger is finished so that we are sure to cleanup
// any final trigger finished bits.
- Preconditions.checkState(
+ checkState(
windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
"Unexpected zero getAllowedLateness");
WindowTracing.debug(
@@ -643,7 +645,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
+ "inputWatermark:{}; outputWatermark:{}",
key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
- Preconditions.checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+ checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
"Cleanup time %s is beyond end-of-time", cleanupTime);
directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
}
@@ -822,14 +824,14 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
if (newHold != null) {
// We can't be finished yet.
- Preconditions.checkState(
+ checkState(
!isFinished, "new hold at %s but finished %s", newHold, directContext.window());
// The hold cannot be behind the input watermark.
- Preconditions.checkState(
+ checkState(
!newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM);
if (newHold.isAfter(directContext.window().maxTimestamp())) {
// The hold must be for garbage collection, which can't have happened yet.
- Preconditions.checkState(
+ checkState(
newHold.isEqual(garbageCollectionTime(directContext.window())),
"new hold %s should be at garbage collection for window %s plus %s",
newHold,
@@ -837,12 +839,12 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
windowingStrategy.getAllowedLateness());
} else {
// The hold must be for the end-of-window, which can't have happened yet.
- Preconditions.checkState(
+ checkState(
newHold.isEqual(directContext.window().maxTimestamp()),
"new hold %s should be at end of window %s",
newHold,
directContext.window());
- Preconditions.checkState(
+ checkState(
!isEndOfWindow,
"new hold at %s for %s but this is the watermark trigger",
newHold,
@@ -915,7 +917,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
directContext.window(),
inputWM,
timerInternals.currentOutputWatermarkTime());
- Preconditions.checkState(!timer.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+ checkState(!timer.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
"Timer %s is beyond end-of-time", timer);
directContext.timers().setTimer(timer, TimeDomain.EVENT_TIME);
return timer;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/core-java/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java
index f104f6a..878d1d7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.util;
+import static com.google.common.base.Preconditions.checkState;
+
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.Trigger;
@@ -27,7 +29,6 @@ import org.apache.beam.sdk.util.state.StateTags;
import org.apache.beam.sdk.util.state.ValueState;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;
@@ -66,7 +67,7 @@ public class TriggerRunner<W extends BoundedWindow> {
private final TriggerContextFactory<W> contextFactory;
public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory<W> contextFactory) {
- Preconditions.checkState(rootTrigger.getTriggerIndex() == 0);
+ checkState(rootTrigger.getTriggerIndex() == 0);
this.rootTrigger = rootTrigger;
this.contextFactory = contextFactory;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
index 14ec082..61ab44a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.util;
+import static com.google.common.base.Preconditions.checkState;
+
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
@@ -30,7 +32,6 @@ import org.apache.beam.sdk.util.state.StateTags;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -205,10 +206,10 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
*/
private Instant shift(Instant timestamp, W window) {
Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
- Preconditions.checkState(!shifted.isBefore(timestamp),
+ checkState(!shifted.isBefore(timestamp),
"OutputTimeFn moved element from %s to earlier time %s for window %s",
timestamp, shifted, window);
- Preconditions.checkState(timestamp.isAfter(window.maxTimestamp())
+ checkState(timestamp.isAfter(window.maxTimestamp())
|| !shifted.isAfter(window.maxTimestamp()),
"OutputTimeFn moved element from %s to %s which is beyond end of "
+ "window %s",
@@ -254,8 +255,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
} else {
which = "on time";
tooLate = false;
- Preconditions.checkState(!elementHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
- "Element hold %s is beyond end-of-time", elementHold);
+ checkState(!elementHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+ "Element hold %s is beyond end-of-time", elementHold);
context.state().access(elementHoldTag).add(elementHold);
}
WindowTracing.trace(
@@ -316,10 +317,10 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
return null;
}
- Preconditions.checkState(outputWM == null || !eowHold.isBefore(outputWM),
+ checkState(outputWM == null || !eowHold.isBefore(outputWM),
"End-of-window hold %s cannot be before output watermark %s",
eowHold, outputWM);
- Preconditions.checkState(!eowHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+ checkState(!eowHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
"End-of-window hold %s is beyond end-of-time", eowHold);
// If paneIsEmpty then this hold is just for empty ON_TIME panes, so we want to keep
// the hold away from the combining function in elementHoldTag.
@@ -387,10 +388,10 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
return null;
}
- Preconditions.checkState(!gcHold.isBefore(inputWM),
+ checkState(!gcHold.isBefore(inputWM),
"Garbage collection hold %s cannot be before input watermark %s",
gcHold, inputWM);
- Preconditions.checkState(!gcHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+ checkState(!gcHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
"Garbage collection hold %s is beyond end-of-time", gcHold);
// Same EXTRA_HOLD_TAG vs elementHoldTag discussion as in addEndOfWindowHold above.
context.state().access(EXTRA_HOLD_TAG).add(gcHold);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
index cd78107..dc2413a 100644
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.util;
import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
@@ -66,7 +67,6 @@ import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.joda.time.Duration;
@@ -1391,26 +1391,26 @@ public class ReduceFnRunnerTest {
}
@Override
public int[] createAccumulator(Context c) {
- Preconditions.checkArgument(
+ checkArgument(
c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue);
- Preconditions.checkArgument(c.sideInput(view) == expectedValue);
+ checkArgument(c.sideInput(view) == expectedValue);
return wrap(0);
}
@Override
public int[] addInput(int[] accumulator, Integer input, Context c) {
- Preconditions.checkArgument(
+ checkArgument(
c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue);
- Preconditions.checkArgument(c.sideInput(view) == expectedValue);
+ checkArgument(c.sideInput(view) == expectedValue);
accumulator[0] += input.intValue();
return accumulator;
}
@Override
public int[] mergeAccumulators(Iterable<int[]> accumulators, Context c) {
- Preconditions.checkArgument(
+ checkArgument(
c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue);
- Preconditions.checkArgument(c.sideInput(view) == expectedValue);
+ checkArgument(c.sideInput(view) == expectedValue);
Iterator<int[]> iter = accumulators.iterator();
if (!iter.hasNext()) {
return createAccumulator(c);
@@ -1425,9 +1425,9 @@ public class ReduceFnRunnerTest {
@Override
public Integer extractOutput(int[] accumulator, Context c) {
- Preconditions.checkArgument(
+ checkArgument(
c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue);
- Preconditions.checkArgument(c.sideInput(view) == expectedValue);
+ checkArgument(c.sideInput(view) == expectedValue);
return accumulator[0];
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
index fa62583..e0ff879 100644
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.util;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -57,7 +59,6 @@ import org.apache.beam.sdk.values.TupleTag;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -637,7 +638,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
data = processingTimers.peek();
break;
}
- Preconditions.checkNotNull(data); // cases exhaustive
+ checkNotNull(data); // cases exhaustive
return data == null ? null : data.getTimestamp();
}
@@ -680,7 +681,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
@Override
public Instant currentInputWatermarkTime() {
- return Preconditions.checkNotNull(inputWatermarkTime);
+ return checkNotNull(inputWatermarkTime);
}
@Override
@@ -702,8 +703,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
public void advanceInputWatermark(
ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark) throws Exception {
- Preconditions.checkNotNull(newInputWatermark);
- Preconditions.checkState(
+ checkNotNull(newInputWatermark);
+ checkState(
!newInputWatermark.isBefore(inputWatermarkTime),
"Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
newInputWatermark);
@@ -724,14 +725,14 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
}
public void advanceOutputWatermark(Instant newOutputWatermark) {
- Preconditions.checkNotNull(newOutputWatermark);
+ checkNotNull(newOutputWatermark);
if (newOutputWatermark.isAfter(inputWatermarkTime)) {
WindowTracing.trace(
"TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}",
newOutputWatermark, inputWatermarkTime);
newOutputWatermark = inputWatermarkTime;
}
- Preconditions.checkState(
+ checkState(
outputWatermarkTime == null || !newOutputWatermark.isBefore(outputWatermarkTime),
"Cannot move output watermark time backwards from %s to %s", outputWatermarkTime,
newOutputWatermark);
@@ -742,7 +743,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
public void advanceProcessingTime(
ReduceFnRunner<?, ?, ?, ?> runner, Instant newProcessingTime) throws Exception {
- Preconditions.checkState(!newProcessingTime.isBefore(processingTime),
+ checkState(!newProcessingTime.isBefore(processingTime),
"Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime);
WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} to {}", processingTime,
newProcessingTime);
@@ -752,7 +753,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
public void advanceSynchronizedProcessingTime(
ReduceFnRunner<?, ?, ?, ?> runner, Instant newSynchronizedProcessingTime) throws Exception {
- Preconditions.checkState(!newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),
+ checkState(!newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),
"Cannot move processing time backwards from %s to %s", processingTime,
newSynchronizedProcessingTime);
WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} to {}",
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index b8f9987..c8dfa8c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.direct;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -33,7 +35,6 @@ import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
@@ -1071,7 +1072,7 @@ public class WatermarkManager {
* Returns the input watermark of the {@link AppliedPTransform}.
*/
public Instant getInputWatermark() {
- return Preconditions.checkNotNull(inputWatermark.get());
+ return checkNotNull(inputWatermark.get());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 4cd8fb3..f4d4ea6 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.flink;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator;
import org.apache.beam.runners.flink.translation.FlinkPipelineTranslator;
import org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator;
@@ -24,8 +26,6 @@ import org.apache.beam.runners.flink.translation.PipelineTranslationOptimizer;
import org.apache.beam.runners.flink.translation.TranslationMode;
import org.apache.beam.sdk.Pipeline;
-import com.google.common.base.Preconditions;
-
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -74,7 +74,7 @@ public class FlinkPipelineExecutionEnvironment {
* @param options the user-defined pipeline options.
* */
FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
- this.options = Preconditions.checkNotNull(options);
+ this.options = checkNotNull(options);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
index 0cb80ba..a75ef03 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.flink.translation;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -28,8 +30,6 @@ import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
-import com.google.common.base.Preconditions;
-
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -56,8 +56,8 @@ public class FlinkStreamingTranslationContext {
private AppliedPTransform<?, ?, ?> currentTransform;
public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) {
- this.env = Preconditions.checkNotNull(env);
- this.options = Preconditions.checkNotNull(options);
+ this.env = checkNotNull(env);
+ this.options = checkNotNull(options);
this.dataStreams = new HashMap<>();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
index 0f1885c..0ee8198 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.flink.translation.functions;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -33,7 +35,6 @@ import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.flink.api.common.functions.RuntimeContext;
@@ -71,10 +72,10 @@ class FlinkProcessContext<InputT, OutputT>
Collector<WindowedValue<OutputT>> collector,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) {
doFn.super();
- Preconditions.checkNotNull(pipelineOptions);
- Preconditions.checkNotNull(runtimeContext);
- Preconditions.checkNotNull(doFn);
- Preconditions.checkNotNull(collector);
+ checkNotNull(pipelineOptions);
+ checkNotNull(runtimeContext);
+ checkNotNull(doFn);
+ checkNotNull(collector);
this.pipelineOptions = pipelineOptions;
this.runtimeContext = runtimeContext;
@@ -93,9 +94,9 @@ class FlinkProcessContext<InputT, OutputT>
WindowingStrategy<?, ?> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) {
doFn.super();
- Preconditions.checkNotNull(pipelineOptions);
- Preconditions.checkNotNull(runtimeContext);
- Preconditions.checkNotNull(doFn);
+ checkNotNull(pipelineOptions);
+ checkNotNull(runtimeContext);
+ checkNotNull(doFn);
this.pipelineOptions = pipelineOptions;
this.runtimeContext = runtimeContext;
@@ -196,8 +197,8 @@ class FlinkProcessContext<InputT, OutputT>
PCollectionView<ViewT> view,
BoundedWindow mainInputWindow) {
- Preconditions.checkNotNull(view, "View passed to sideInput cannot be null");
- Preconditions.checkNotNull(
+ checkNotNull(view, "View passed to sideInput cannot be null");
+ checkNotNull(
sideInputs.get(view),
"Side input for " + view + " not available.");
@@ -222,8 +223,8 @@ class FlinkProcessContext<InputT, OutputT>
@Override
public <ViewT> ViewT sideInput(PCollectionView<ViewT> view) {
- Preconditions.checkNotNull(view, "View passed to sideInput cannot be null");
- Preconditions.checkNotNull(sideInputs.get(view), "Side input for " + view + " not available.");
+ checkNotNull(view, "View passed to sideInput cannot be null");
+ checkNotNull(sideInputs.get(view), "Side input for " + view + " not available.");
Iterator<? extends BoundedWindow> windowIter = windowedValue.getWindows().iterator();
BoundedWindow window;
if (!windowIter.hasNext()) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
index 0e85486..71cc6b7 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
@@ -17,9 +17,9 @@
*/
package org.apache.beam.runners.flink.translation.types;
-import org.apache.beam.sdk.coders.Coder;
+import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.common.base.Preconditions;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -36,7 +36,7 @@ public class CoderTypeInformation<T> extends TypeInformation<T> implements Atomi
private final Coder<T> coder;
public CoderTypeInformation(Coder<T> coder) {
- Preconditions.checkNotNull(coder);
+ checkNotNull(coder);
this.coder = coder;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
index 2b35c31..44af0ea 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -18,10 +18,12 @@
package org.apache.beam.runners.flink.translation.utils;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.sdk.options.PipelineOptions;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
@@ -37,7 +39,7 @@ public class SerializedPipelineOptions implements Serializable {
private transient PipelineOptions pipelineOptions;
public SerializedPipelineOptions(PipelineOptions options) {
- Preconditions.checkNotNull(options, "PipelineOptions must not be null.");
+ checkNotNull(options, "PipelineOptions must not be null.");
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
new ObjectMapper().writeValue(baos, options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index f68a519..d8222b6 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -33,8 +35,6 @@ import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
-import com.google.common.base.Preconditions;
-
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -59,9 +59,9 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl
private DoFnProcessContext context;
public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUTDF> doFn) {
- Preconditions.checkNotNull(options);
- Preconditions.checkNotNull(windowingStrategy);
- Preconditions.checkNotNull(doFn);
+ checkNotNull(options);
+ checkNotNull(windowingStrategy);
+ checkNotNull(doFn);
this.doFn = doFn;
this.serializedPipelineOptions = new SerializedPipelineOptions(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index 9d2cad8..3f845cf 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
@@ -50,7 +52,6 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
-import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
@@ -152,7 +153,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey,
Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner,
KvCoder<K, VOUT> outputKvCoder) {
- Preconditions.checkNotNull(options);
+ checkNotNull(options);
KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper<>(options,
@@ -190,7 +191,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
PipelineOptions options,
PCollection input,
KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey) {
- Preconditions.checkNotNull(options);
+ checkNotNull(options);
KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
Coder<K> keyCoder = inputKvCoder.getKeyCoder();
@@ -224,7 +225,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy,
KvCoder<K, VIN> inputCoder,
Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
- Preconditions.checkNotNull(options);
+ checkNotNull(options);
return new FlinkGroupAlsoByWindowWrapper(options, registry, windowingStrategy, inputCoder, combiner);
}
@@ -234,12 +235,12 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy,
KvCoder<K, VIN> inputCoder,
Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
- Preconditions.checkNotNull(options);
+ checkNotNull(options);
- this.serializedOptions = new SerializedPipelineOptions(Preconditions.checkNotNull(options));
- this.coderRegistry = Preconditions.checkNotNull(registry);
- this.inputKvCoder = Preconditions.checkNotNull(inputCoder);//(KvCoder<K, VIN>) input.getCoder();
- this.windowingStrategy = Preconditions.checkNotNull(windowingStrategy);//input.getWindowingStrategy();
+ this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options));
+ this.coderRegistry = checkNotNull(registry);
+ this.inputKvCoder = checkNotNull(inputCoder);//(KvCoder<K, VIN>) input.getCoder();
+ this.windowingStrategy = checkNotNull(windowingStrategy);//input.getWindowingStrategy();
this.combineFn = combiner;
this.operator = createGroupAlsoByWindowOperator();
this.chainingStrategy = ChainingStrategy.ALWAYS;
@@ -447,8 +448,8 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
function.super();
super.setupDelegateAggregators();
- this.collector = Preconditions.checkNotNull(outCollector);
- this.timerInternals = Preconditions.checkNotNull(timerInternals);
+ this.collector = checkNotNull(outCollector);
+ this.timerInternals = checkNotNull(timerInternals);
}
public void setElement(KeyedWorkItem<K, VIN> element,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
index a30a544..619b887 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
@@ -25,8 +27,6 @@ import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.TupleTag;
-import com.google.common.base.Preconditions;
-
import org.apache.flink.util.Collector;
import org.joda.time.Instant;
@@ -42,8 +42,8 @@ public class FlinkParDoBoundMultiWrapper<IN, OUT> extends FlinkAbstractParDoWrap
public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) {
super(options, windowingStrategy, doFn);
- this.mainTag = Preconditions.checkNotNull(mainTag);
- this.outputLabels = Preconditions.checkNotNull(tagsToLabels);
+ this.mainTag = checkNotNull(mainTag);
+ this.outputLabels = checkNotNull(tagsToLabels);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
index 77c195a..098473d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-import com.google.common.base.Preconditions;
+
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.io.Sink;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
index b636036..94b73ce 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -17,13 +17,12 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
-import com.google.common.base.Preconditions;
-
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.List;
@@ -42,7 +41,7 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.
private Coder<T> coder;
public UnboundedFlinkSource(SourceFunction<T> source) {
- flinkSource = Preconditions.checkNotNull(source);
+ flinkSource = checkNotNull(source);
}
public SourceFunction<T> getFlinkSource() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index 18d4c3c..e6a43dc 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine;
@@ -38,7 +40,6 @@ import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
import org.apache.beam.sdk.values.PCollectionView;
-import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.apache.flink.util.InstantiationUtil;
@@ -534,8 +535,8 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn,
Coder<AccumT> accumCoder,
final StateContext<?> stateContext) {
- Preconditions.checkNotNull(combineFn);
- Preconditions.checkNotNull(accumCoder);
+ checkNotNull(combineFn);
+ checkNotNull(accumCoder);
this.stateKey = stateKey;
this.combineFn = combineFn;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index e15b9d2..7fd203f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -29,6 +29,9 @@ import static org.apache.beam.sdk.util.Structs.addString;
import static org.apache.beam.sdk.util.Structs.getString;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Strings.isNullOrEmpty;
import org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly;
import org.apache.beam.runners.dataflow.internal.ReadTranslator;
@@ -76,8 +79,6 @@ import com.google.api.services.dataflow.model.Environment;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.Step;
import com.google.api.services.dataflow.model.WorkerPool;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -435,13 +436,13 @@ public class DataflowPipelineTranslator {
disk.setDiskType(options.getWorkerDiskType());
workerPool.setDataDisks(Collections.singletonList(disk));
}
- if (!Strings.isNullOrEmpty(options.getZone())) {
+ if (!isNullOrEmpty(options.getZone())) {
workerPool.setZone(options.getZone());
}
- if (!Strings.isNullOrEmpty(options.getNetwork())) {
+ if (!isNullOrEmpty(options.getNetwork())) {
workerPool.setNetwork(options.getNetwork());
}
- if (!Strings.isNullOrEmpty(options.getSubnetwork())) {
+ if (!isNullOrEmpty(options.getSubnetwork())) {
workerPool.setSubnetwork(options.getSubnetwork());
}
if (options.getDiskSizeGb() > 0) {
@@ -669,11 +670,11 @@ public class DataflowPipelineTranslator {
PValue inputValue,
PValue outputValue) {
Coder<?> inputValueCoder =
- Preconditions.checkNotNull(outputCoders.get(inputValue));
+ checkNotNull(outputCoders.get(inputValue));
// The inputValueCoder for the input PCollection should be some
// WindowedValueCoder of the input PCollection's element
// coder.
- Preconditions.checkState(
+ checkState(
inputValueCoder instanceof WindowedValue.WindowedValueCoder);
// The outputValueCoder for the output should be an
// IterableCoder of the inputValueCoder. This is a property
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 7ff247a..33f97e6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -23,6 +23,7 @@ import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Strings.isNullOrEmpty;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
@@ -135,8 +136,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import com.google.common.base.Utf8;
import com.google.common.collect.ForwardingMap;
import com.google.common.collect.HashMultimap;
@@ -261,8 +260,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
PathValidator validator = dataflowOptions.getPathValidator();
- Preconditions.checkArgument(!(Strings.isNullOrEmpty(dataflowOptions.getTempLocation())
- && Strings.isNullOrEmpty(dataflowOptions.getStagingLocation())),
+ checkArgument(!(isNullOrEmpty(dataflowOptions.getTempLocation())
+ && isNullOrEmpty(dataflowOptions.getStagingLocation())),
"Missing required value: at least one of tempLocation or stagingLocation must be set.");
if (dataflowOptions.getStagingLocation() != null) {
@@ -271,9 +270,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
if (dataflowOptions.getTempLocation() != null) {
validator.validateOutputFilePrefixSupported(dataflowOptions.getTempLocation());
}
- if (Strings.isNullOrEmpty(dataflowOptions.getTempLocation())) {
+ if (isNullOrEmpty(dataflowOptions.getTempLocation())) {
dataflowOptions.setTempLocation(dataflowOptions.getStagingLocation());
- } else if (Strings.isNullOrEmpty(dataflowOptions.getStagingLocation())) {
+ } else if (isNullOrEmpty(dataflowOptions.getStagingLocation())) {
try {
dataflowOptions.setStagingLocation(
IOChannelUtils.resolve(dataflowOptions.getTempLocation(), "staging"));
@@ -545,7 +544,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
newJob.getEnvironment().setUserAgent(ReleaseInfo.getReleaseInfo());
// The Dataflow Service may write to the temporary directory directly, so
// must be verified.
- if (!Strings.isNullOrEmpty(options.getTempLocation())) {
+ if (!isNullOrEmpty(options.getTempLocation())) {
newJob.getEnvironment().setTempStoragePrefix(
dataflowOptions.getPathValidator().verifyPath(options.getTempLocation()));
}
@@ -578,7 +577,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment());
}
- if (!Strings.isNullOrEmpty(options.getDataflowJobFile())) {
+ if (!isNullOrEmpty(options.getDataflowJobFile())) {
try (PrintWriter printWriter = new PrintWriter(
new File(options.getDataflowJobFile()))) {
String workSpecJson = DataflowPipelineTranslator.jobToString(newJob);
@@ -2203,7 +2202,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
break; // supported by server
case "":
// Empty shard template allowed - forces single output.
- Preconditions.checkArgument(originalTransform.getNumShards() <= 1,
+ checkArgument(originalTransform.getNumShards() <= 1,
"Num shards must be <= 1 when using an empty sharding template");
break;
default:
@@ -2308,7 +2307,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
break; // supported by server
case "":
// Empty shard template allowed - forces single output.
- Preconditions.checkArgument(originalTransform.getNumShards() <= 1,
+ checkArgument(originalTransform.getNumShards() <= 1,
"Num shards must be <= 1 when using an empty sharding template");
break;
default:
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index 1969cfb..d8bfe42 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -39,7 +39,6 @@ import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.PCollection;
import com.google.auto.value.AutoValue;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
@@ -54,6 +53,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
+
import javax.annotation.Nullable;
/**
@@ -215,7 +215,7 @@ public class IsmFormat {
@JsonProperty(PropertyNames.NUM_SHARD_CODERS) int numberOfShardCoders,
@JsonProperty(PropertyNames.NUM_METADATA_SHARD_CODERS) int numberOfMetadataShardCoders,
@JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
- Preconditions.checkArgument(components.size() >= 2,
+ checkArgument(components.size() >= 2,
"Expecting at least 2 components, got " + components.size());
return of(
numberOfShardCoders,
@@ -497,7 +497,7 @@ public class IsmFormat {
@JsonCreator
public static MetadataKeyCoder<?> of(
@JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
- Preconditions.checkArgument(components.size() == 1,
+ checkArgument(components.size() == 1,
"Expecting one component, got " + components.size());
return of(components.get(0));
}
@@ -725,7 +725,7 @@ public class IsmFormat {
@Override
public long getEncodedElementByteSize(KeyPrefix value, Coder.Context context)
throws Exception {
- Preconditions.checkNotNull(value);
+ checkNotNull(value);
return VarInt.getLength(value.getSharedKeySize())
+ VarInt.getLength(value.getUnsharedKeySize());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
index dc7748a..91ac62a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
@@ -17,12 +17,12 @@
*/
package org.apache.beam.runners.dataflow.options;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
-import com.google.common.base.Preconditions;
-
import com.fasterxml.jackson.annotation.JsonCreator;
import java.util.Arrays;
@@ -144,7 +144,7 @@ public interface DataflowWorkerLoggingOptions extends PipelineOptions {
* and passing in the {@link Class#getName() class name}.
*/
public WorkerLogLevelOverrides addOverrideForClass(Class<?> klass, Level level) {
- Preconditions.checkNotNull(klass, "Expected class to be not null.");
+ checkNotNull(klass, "Expected class to be not null.");
addOverrideForName(klass.getName(), level);
return this;
}
@@ -157,7 +157,7 @@ public interface DataflowWorkerLoggingOptions extends PipelineOptions {
* and passing in the {@link Package#getName() package name}.
*/
public WorkerLogLevelOverrides addOverrideForPackage(Package pkg, Level level) {
- Preconditions.checkNotNull(pkg, "Expected package to be not null.");
+ checkNotNull(pkg, "Expected package to be not null.");
addOverrideForName(pkg.getName(), level);
return this;
}
@@ -170,8 +170,8 @@ public interface DataflowWorkerLoggingOptions extends PipelineOptions {
* a parent logger that has the passed in name.
*/
public WorkerLogLevelOverrides addOverrideForName(String name, Level level) {
- Preconditions.checkNotNull(name, "Expected name to be not null.");
- Preconditions.checkNotNull(level,
+ checkNotNull(name, "Expected name to be not null.");
+ checkNotNull(level,
"Expected level to be one of %s.", Arrays.toString(Level.values()));
put(name, level);
return this;
@@ -186,7 +186,7 @@ public interface DataflowWorkerLoggingOptions extends PipelineOptions {
*/
@JsonCreator
public static WorkerLogLevelOverrides from(Map<String, String> values) {
- Preconditions.checkNotNull(values, "Expected values to be not null.");
+ checkNotNull(values, "Expected values to be not null.");
WorkerLogLevelOverrides overrides = new WorkerLogLevelOverrides();
for (Map.Entry<String, String> entry : values.entrySet()) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
index ddc5d6f..ec10b28 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
@@ -17,13 +17,13 @@
*/
package org.apache.beam.runners.dataflow.util;
+import static com.google.common.base.Preconditions.checkArgument;
+
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.PathValidator;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import com.google.common.base.Preconditions;
-
import java.io.IOException;
/**
@@ -48,8 +48,7 @@ public class DataflowPathValidator implements PathValidator {
@Override
public String validateInputFilePatternSupported(String filepattern) {
GcsPath gcsPath = getGcsPath(filepattern);
- Preconditions.checkArgument(
- dataflowOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
+ checkArgument(dataflowOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
String returnValue = verifyPath(filepattern);
verifyPathIsAccessible(filepattern, "Could not find file %s");
return returnValue;
@@ -69,9 +68,8 @@ public class DataflowPathValidator implements PathValidator {
@Override
public String verifyPath(String path) {
GcsPath gcsPath = getGcsPath(path);
- Preconditions.checkArgument(gcsPath.isAbsolute(),
- "Must provide absolute paths for Dataflow");
- Preconditions.checkArgument(!gcsPath.getObject().contains("//"),
+ checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
+ checkArgument(!gcsPath.getObject().contains("//"),
"Dataflow Service does not allow objects with consecutive slashes");
return gcsPath.toResourceName();
}
@@ -79,7 +77,7 @@ public class DataflowPathValidator implements PathValidator {
private void verifyPathIsAccessible(String path, String errorMessage) {
GcsPath gcsPath = getGcsPath(path);
try {
- Preconditions.checkArgument(dataflowOptions.getGcsUtil().bucketExists(gcsPath),
+ checkArgument(dataflowOptions.getGcsUtil().bucketExists(gcsPath),
errorMessage, path);
} catch (IOException e) {
throw new RuntimeException(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index 8e7cbbe..bf25ce4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -17,12 +17,13 @@
*/
package org.apache.beam.runners.dataflow.util;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.common.base.Preconditions;
import java.util.List;
@@ -42,7 +43,7 @@ public class GcsStager implements Stager {
@Override
public List<DataflowPackage> stageFiles() {
- Preconditions.checkNotNull(options.getStagingLocation());
+ checkNotNull(options.getStagingLocation());
List<String> filesToStage = options.getFilesToStage();
String windmillBinary =
options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index e7a9971..b3beae6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -17,12 +17,13 @@
*/
package org.apache.beam.runners.spark.io;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
-import com.google.common.base.Preconditions;
/**
* Create an input stream from Queue.
@@ -53,8 +54,8 @@ public final class CreateStream<T> {
private final Iterable<Iterable<T>> queuedValues;
QueuedValues(Iterable<Iterable<T>> queuedValues) {
- Preconditions.checkNotNull(queuedValues,
- "need to set the queuedValues of an Create.QueuedValues transform");
+ checkNotNull(
+ queuedValues, "need to set the queuedValues of an Create.QueuedValues transform");
this.queuedValues = queuedValues;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b8615ea/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
index a97d86e..13171f3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
@@ -17,14 +17,14 @@
*/
package org.apache.beam.runners.spark.io;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
-import com.google.common.base.Preconditions;
-
import java.util.Map;
import java.util.Set;
@@ -82,18 +82,13 @@ public final class KafkaIO {
Unbound(Class<? extends Decoder<K>> keyDecoder,
Class<? extends Decoder<V>> valueDecoder, Class<K> key,
Class<V> value, Set<String> topics, Map<String, String> kafkaParams) {
- Preconditions.checkNotNull(keyDecoder,
- "need to set the key decoder class of a KafkaIO.Read transform");
- Preconditions.checkNotNull(valueDecoder,
- "need to set the value decoder class of a KafkaIO.Read transform");
- Preconditions.checkNotNull(key,
- "need to set the key class of aKafkaIO.Read transform");
- Preconditions.checkNotNull(value,
- "need to set the value class of a KafkaIO.Read transform");
- Preconditions.checkNotNull(topics,
- "need to set the topics of a KafkaIO.Read transform");
- Preconditions.checkNotNull(kafkaParams,
- "need to set the kafkaParams of a KafkaIO.Read transform");
+ checkNotNull(keyDecoder, "need to set the key decoder class of a KafkaIO.Read transform");
+ checkNotNull(
+ valueDecoder, "need to set the value decoder class of a KafkaIO.Read transform");
+ checkNotNull(key, "need to set the key class of a KafkaIO.Read transform");
+ checkNotNull(value, "need to set the value class of a KafkaIO.Read transform");
+ checkNotNull(topics, "need to set the topics of a KafkaIO.Read transform");
+ checkNotNull(kafkaParams, "need to set the kafkaParams of a KafkaIO.Read transform");
this.keyDecoderClass = keyDecoder;
this.valueDecoderClass = valueDecoder;
this.keyClass = key;