You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/04/15 19:39:48 UTC
[03/12] flink git commit: [FLINK-3700] [core] Remove Guava dependency
from flink-core
[FLINK-3700] [core] Remove Guava dependency from flink-core
Almost all Guava functionality used within flink-core has corresponding
utils in Flink's codebase, or the JDK library.
This replaces the Guava code as follows
- Preconditions calls by Flink's Preconditions class
- Collection utils by simple Java Collection calls
- Iterator's by Flink's Union Iterator
- Files by simple util methods arount java.nio.Files
- InetAddresses IPv6 encoding code has been adapted into Flink's NetUtils (with attribution comments)
Some util classes where moved from flink-runtime to flink-core.
This closes #1854
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/760a0d9e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/760a0d9e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/760a0d9e
Branch: refs/heads/master
Commit: 760a0d9e7fd9fa88e9f7408b137d78df384d764f
Parents: 1b93b32
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 5 15:18:32 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Apr 15 19:38:14 2016 +0200
----------------------------------------------------------------------
.../storm/tests/StormFieldsGroupingITCase.java | 2 +-
flink-core/pom.xml | 6 -
.../java/org/apache/flink/api/common/Plan.java | 5 +-
.../org/apache/flink/api/common/TaskInfo.java | 4 +-
.../util/AbstractRuntimeUDFContext.java | 9 +-
.../api/common/io/DelimitedInputFormat.java | 17 +-
.../flink/api/common/io/FileInputFormat.java | 28 +--
.../api/common/io/GenericCsvInputFormat.java | 43 ++--
.../common/operators/GenericDataSinkBase.java | 21 +-
.../apache/flink/api/common/operators/Keys.java | 19 +-
.../base/GroupCombineOperatorBase.java | 6 +-
.../operators/base/GroupReduceOperatorBase.java | 6 +-
.../api/common/operators/util/FieldList.java | 6 +-
.../operators/util/UserCodeObjectWrapper.java | 7 +-
.../api/common/typeinfo/BasicArrayTypeInfo.java | 10 +-
.../api/common/typeinfo/BasicTypeInfo.java | 8 +-
.../api/common/typeinfo/FractionalTypeInfo.java | 19 +-
.../api/common/typeinfo/IntegerTypeInfo.java | 25 ++-
.../api/common/typeinfo/NumericTypeInfo.java | 29 +--
.../common/typeinfo/PrimitiveArrayTypeInfo.java | 22 +-
.../api/common/typeutils/CompositeType.java | 4 +-
.../common/typeutils/base/EnumSerializer.java | 5 +-
.../typeutils/base/GenericArraySerializer.java | 7 +-
.../flink/api/java/typeutils/EnumTypeInfo.java | 7 +-
.../api/java/typeutils/GenericTypeInfo.java | 4 +-
.../api/java/typeutils/ObjectArrayTypeInfo.java | 15 +-
.../flink/api/java/typeutils/PojoField.java | 7 +-
.../flink/api/java/typeutils/PojoTypeInfo.java | 40 ++--
.../flink/api/java/typeutils/TupleTypeInfo.java | 32 ++-
.../api/java/typeutils/TupleTypeInfoBase.java | 6 +-
.../flink/api/java/typeutils/TypeExtractor.java | 8 +-
.../flink/api/java/typeutils/ValueTypeInfo.java | 12 +-
.../api/java/typeutils/WritableTypeInfo.java | 11 +-
.../java/typeutils/runtime/AvroSerializer.java | 7 +-
.../runtime/CopyableValueSerializer.java | 4 +-
.../java/typeutils/runtime/PojoSerializer.java | 10 +-
.../typeutils/runtime/TupleSerializerBase.java | 6 +-
.../java/typeutils/runtime/ValueSerializer.java | 5 +-
.../typeutils/runtime/kryo/KryoSerializer.java | 8 +-
.../org/apache/flink/types/StringValue.java | 14 +-
.../java/org/apache/flink/util/FileUtils.java | 91 ++++++++
.../java/org/apache/flink/util/IOUtils.java | 223 +++++++++++++++++++
.../java/org/apache/flink/util/MathUtils.java | 175 +++++++++++++++
.../java/org/apache/flink/util/NetUtils.java | 86 ++++++-
.../org/apache/flink/util/Preconditions.java | 57 ++++-
.../org/apache/flink/util/UnionIterator.java | 102 +++++++++
.../org/apache/flink/util/XORShiftRandom.java | 4 +-
.../base/OuterJoinOperatorBaseTest.java | 4 +-
.../flink/api/java/tuple/TupleGenerator.java | 7 +-
.../java/typeutils/PojoTypeExtractionTest.java | 13 +-
.../typeutils/runtime/PojoSerializerTest.java | 8 +-
.../runtime/PojoSubclassSerializerTest.java | 5 +-
.../SubclassFromInterfaceSerializerTest.java | 5 +-
.../apache/flink/testutils/junit/RetryRule.java | 5 +-
.../org/apache/flink/util/MathUtilTest.java | 102 +++++++++
.../org/apache/flink/util/NetUtilsTest.java | 4 -
.../apache/flink/util/UnionIteratorTest.java | 143 ++++++++++++
.../apache/flink/runtime/blob/BlobUtils.java | 2 +-
.../flink/runtime/blob/FileSystemBlobStore.java | 2 +-
.../checkpoint/FileSystemStateStore.java | 2 +-
.../flink/runtime/filecache/FileCache.java | 2 +-
.../runtime/io/disk/FileChannelInputView.java | 2 +-
.../runtime/io/disk/RandomAccessInputView.java | 2 +-
.../runtime/io/disk/RandomAccessOutputView.java | 2 +-
.../io/disk/SeekableFileChannelInputView.java | 2 +-
.../io/disk/SimpleCollectingOutputView.java | 2 +-
.../flink/runtime/memory/MemoryManager.java | 2 +-
.../operators/hash/CompactingHashTable.java | 2 +-
.../runtime/operators/hash/HashPartition.java | 2 +-
.../operators/hash/MutableHashTable.java | 2 +-
.../operators/shipping/OutputEmitter.java | 2 +-
.../apache/flink/runtime/util/FileUtils.java | 61 -----
.../org/apache/flink/runtime/util/IOUtils.java | 213 ------------------
.../apache/flink/runtime/util/MathUtils.java | 176 ---------------
.../flink/runtime/util/UnionIterator.java | 100 ---------
.../FileSystemStateStorageHelper.java | 2 +-
.../flink/runtime/taskmanager/TaskManager.scala | 2 +-
.../runtime/testutils/CommonTestUtils.java | 2 +-
.../apache/flink/runtime/util/MathUtilTest.java | 101 ---------
.../flink/runtime/util/UnionIteratorTest.java | 142 ------------
.../source/SocketTextStreamFunction.java | 2 +-
...ractAlignedProcessingTimeWindowOperator.java | 2 +-
.../windowing/AccumulatingKeyedTimePanes.java | 2 +-
.../runtime/operators/windowing/KeyMap.java | 2 +-
.../runtime/partitioner/HashPartitioner.java | 2 +-
.../apache/flink/streaming/api/IterateTest.java | 2 +-
.../streaming/api/StreamingOperatorsITCase.java | 2 +-
.../api/scala/StreamingOperatorsITCase.scala | 2 +-
88 files changed, 1294 insertions(+), 1087 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
index 1875ecb..b43b24d 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
@@ -21,7 +21,7 @@ import backtype.storm.Config;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.tests.operators.FiniteRandomSpout;
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 59b620b..eb55bdd 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -70,12 +70,6 @@ under the License.
<artifactId>${shading-artifact.name}</artifactId>
<version>${project.version}</version>
</dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
<!-- test depedencies -->
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index 30f2c2f..0235af0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -18,9 +18,6 @@
package org.apache.flink.api.common;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
@@ -43,6 +40,8 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Visitable;
import org.apache.flink.util.Visitor;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
/**
* This class represents Flink programs, in the form of dataflow plans.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
index 6482cde..ac87e74 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
@@ -20,8 +20,8 @@ package org.apache.flink.api.common;
import org.apache.flink.annotation.Internal;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Encapsulates task-specific information: name, index of subtask, parallelism and attempt number.
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index afe9f77..74b78df 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Future;
-import com.google.common.base.Preconditions;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
@@ -45,6 +44,8 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.Path;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A standalone implementation of the {@link RuntimeContext}, created by runtime UDF operators.
*/
@@ -66,11 +67,11 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
ExecutionConfig executionConfig,
Map<String, Accumulator<?,?>> accumulators,
Map<String, Future<Path>> cpTasks) {
- this.taskInfo = Preconditions.checkNotNull(taskInfo);
+ this.taskInfo = checkNotNull(taskInfo);
this.userCodeClassLoader = userCodeClassLoader;
this.executionConfig = executionConfig;
- this.distributedCache = new DistributedCache(Preconditions.checkNotNull(cpTasks));
- this.accumulators = Preconditions.checkNotNull(accumulators);
+ this.distributedCache = new DistributedCache(checkNotNull(cpTasks));
+ this.accumulators = checkNotNull(accumulators);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index f6b6d49..243e2a4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -18,12 +18,7 @@
package org.apache.flink.api.common.io;
-import java.io.IOException;
-import java.util.ArrayList;
-
import org.apache.flink.annotation.Public;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -33,7 +28,12 @@ import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
/**
* Base implementation for input formats that split the input at a delimiter into records.
@@ -53,6 +53,9 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
* The log.
*/
private static final Logger LOG = LoggerFactory.getLogger(DelimitedInputFormat.class);
+
+ /** The default charset to convert strings to bytes */
+ private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
/**
* The default read buffer size = 1MB.
@@ -185,7 +188,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
}
public void setDelimiter(String delimiter) {
- this.delimiter = delimiter.getBytes(Charsets.UTF_8);
+ this.delimiter = delimiter.getBytes(UTF_8_CHARSET);
}
public int getLineLengthLimit() {
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index c4cd2b3..fd69cc3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -18,22 +18,10 @@
package org.apache.flink.api.common.io;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory;
import org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory;
import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -45,6 +33,20 @@ import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* The base class for {@link RichInputFormat}s that read from files. For specific input types the
* {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be implemented.
@@ -143,7 +145,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
* @return the extension of the file name or {@code null} if there is no extension.
*/
protected static String extractFileExtension(String fileName) {
- Preconditions.checkNotNull(fileName);
+ checkNotNull(fileName);
int lastPeriodIndex = fileName.lastIndexOf('.');
if (lastPeriodIndex < 0){
return null;
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index cb20f81..31d42ff 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -16,13 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.io;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
@@ -30,6 +25,7 @@ import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.types.parser.StringParser;
import org.apache.flink.types.parser.StringValueParser;
import org.apache.flink.util.InstantiationUtil;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,13 +37,20 @@ import java.util.ArrayList;
import java.util.Map;
import java.util.TreeMap;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
@Internal
public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> {
- private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);
-
private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);
+
+ /** The default charset to convert strings to bytes */
+ private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
+
private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
private static final boolean[] EMPTY_INCLUDED = new boolean[0];
@@ -127,7 +130,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
}
public void setCommentPrefix(String commentPrefix) {
- setCommentPrefix(commentPrefix, Charsets.UTF_8);
+ setCommentPrefix(commentPrefix, UTF_8_CHARSET);
}
public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
@@ -171,7 +174,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
}
public void setFieldDelimiter(String delimiter) {
- this.fieldDelim = delimiter.getBytes(Charsets.UTF_8);
+ this.fieldDelim = delimiter.getBytes(UTF_8_CHARSET);
}
public boolean isLenient() {
@@ -247,9 +250,9 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
}
protected void setFieldsGeneric(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
- Preconditions.checkNotNull(sourceFieldIndices);
- Preconditions.checkNotNull(fieldTypes);
- Preconditions.checkArgument(sourceFieldIndices.length == fieldTypes.length,
+ checkNotNull(sourceFieldIndices);
+ checkNotNull(fieldTypes);
+ checkArgument(sourceFieldIndices.length == fieldTypes.length,
"Number of field indices and field types must match.");
for (int i : sourceFieldIndices) {
@@ -258,7 +261,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
}
}
- int largestFieldIndex = Ints.max(sourceFieldIndices);
+ int largestFieldIndex = max(sourceFieldIndices);
this.fieldIncluded = new boolean[largestFieldIndex + 1];
ArrayList<Class<?>> types = new ArrayList<Class<?>>();
@@ -280,8 +283,8 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
}
protected void setFieldsGeneric(boolean[] includedMask, Class<?>[] fieldTypes) {
- Preconditions.checkNotNull(includedMask);
- Preconditions.checkNotNull(fieldTypes);
+ checkNotNull(includedMask);
+ checkNotNull(fieldTypes);
ArrayList<Class<?>> types = new ArrayList<Class<?>>();
@@ -530,4 +533,14 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
lastPos = positions[i];
}
}
+
+ private static int max(int[] ints) {
+ checkArgument(ints.length > 0);
+
+ int max = ints[0];
+ for (int i = 1 ; i < ints.length; i++) {
+ max = Math.max(max, ints[i]);
+ }
+ return max;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
index 33f11f3..1f3875d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.operators;
import java.util.List;
@@ -39,7 +38,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.types.Nothing;
import org.apache.flink.util.Visitor;
-import com.google.common.base.Preconditions;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Operator for nodes that act as data sinks, storing the data they receive.
@@ -66,7 +65,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
public GenericDataSinkBase(OutputFormat<IN> f, UnaryOperatorInformation<IN, Nothing> operatorInfo, String name) {
super(operatorInfo, name);
- Preconditions.checkNotNull(f, "The OutputFormat may not be null.");
+ checkNotNull(f, "The OutputFormat may not be null.");
this.formatWrapper = new UserCodeObjectWrapper<OutputFormat<IN>>(f);
}
@@ -79,8 +78,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
*/
public GenericDataSinkBase(UserCodeWrapper<? extends OutputFormat<IN>> f, UnaryOperatorInformation<IN, Nothing> operatorInfo, String name) {
super(operatorInfo, name);
- Preconditions.checkNotNull(f, "The OutputFormat class may not be null.");
- this.formatWrapper = f;
+ this.formatWrapper = checkNotNull(f, "The OutputFormat class may not be null.");
}
// --------------------------------------------------------------------------------------------
@@ -100,8 +98,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
* @param input The operator to use as the input.
*/
public void setInput(Operator<IN> input) {
- Preconditions.checkNotNull(input, "The input may not be null.");
- this.input = input;
+ this.input = checkNotNull(input, "The input may not be null.");
}
/**
@@ -112,7 +109,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
*/
@Deprecated
public void setInputs(Operator<IN>... inputs) {
- Preconditions.checkNotNull(inputs, "The inputs may not be null.");
+ checkNotNull(inputs, "The inputs may not be null.");
this.input = Operator.createUnionCascade(inputs);
}
@@ -124,7 +121,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
*/
@Deprecated
public void setInputs(List<Operator<IN>> inputs) {
- Preconditions.checkNotNull(inputs, "The inputs may not be null.");
+ checkNotNull(inputs, "The inputs may not be null.");
this.input = Operator.createUnionCascade(inputs);
}
@@ -136,7 +133,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
*/
@Deprecated
public void addInput(Operator<IN>... inputs) {
- Preconditions.checkNotNull(inputs, "The input may not be null.");
+ checkNotNull(inputs, "The input may not be null.");
this.input = Operator.createUnionCascade(this.input, inputs);
}
@@ -149,7 +146,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
@SuppressWarnings("unchecked")
@Deprecated
public void addInputs(List<? extends Operator<IN>> inputs) {
- Preconditions.checkNotNull(inputs, "The inputs may not be null.");
+ checkNotNull(inputs, "The inputs may not be null.");
this.input = createUnionCascade(this.input, (Operator<IN>[]) inputs.toArray(new Operator[inputs.size()]));
}
@@ -259,7 +256,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
format.configure(this.parameters);
if(format instanceof RichOutputFormat){
- ((RichOutputFormat) format).setRuntimeContext(ctx);
+ ((RichOutputFormat<?>) format).setRuntimeContext(ctx);
}
format.open(0, 1);
for (IN element : inputData) {
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
index abe41af..0b771e8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
@@ -18,10 +18,7 @@
package org.apache.flink.api.common.operators;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.base.Joiner;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.InvalidProgramException;
@@ -33,7 +30,11 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public abstract class Keys<T> {
@@ -232,7 +233,8 @@ public abstract class Keys<T> {
} else {
rangeCheckFields(keyPositions, type.getArity() - 1);
}
- Preconditions.checkArgument(keyPositions.length > 0, "Grouping fields can not be empty at this point");
+
+ checkArgument(keyPositions.length > 0, "Grouping fields can not be empty at this point");
// extract key field types
CompositeType<T> cType = (CompositeType<T>)type;
@@ -266,7 +268,7 @@ public abstract class Keys<T> {
* Create String-based (nested) field expression keys on a composite type.
*/
public ExpressionKeys(String[] keyExpressions, TypeInformation<T> type) {
- Preconditions.checkNotNull(keyExpressions, "Field expression cannot be null.");
+ checkNotNull(keyExpressions, "Field expression cannot be null.");
this.keyFields = new ArrayList<>(keyExpressions.length);
@@ -375,8 +377,7 @@ public abstract class Keys<T> {
@Override
public String toString() {
- Joiner join = Joiner.on('.');
- return "ExpressionKeys: " + join.join(keyFields);
+ return "ExpressionKeys: " + StringUtils.join(keyFields, '.');
}
public static boolean isSortKey(int fieldPos, TypeInformation<?> type) {
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
index b660506..15a0f5a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
@@ -18,8 +18,8 @@
package org.apache.flink.api.common.operators.base;
-import com.google.common.base.Preconditions;
import org.apache.commons.lang3.ArrayUtils;
+
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
@@ -43,6 +43,8 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* Base operator for the combineGroup transformation. It receives the UDF GroupCombineFunction as an input.
* This class is later processed by the compiler to generate the plan.
@@ -109,7 +111,7 @@ public class GroupCombineOperatorBase<IN, OUT, FT extends GroupCombineFunction<I
}
if(sortColumns.length == 0) { // => all reduce. No comparator
- Preconditions.checkArgument(sortOrderings.length == 0);
+ checkArgument(sortOrderings.length == 0);
} else {
final TypeComparator<IN> sortComparator = getTypeComparator(inputType, sortColumns, sortOrderings, executionConfig);
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index 0cc0209..0794a77 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -41,13 +41,13 @@ import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import com.google.common.base.Preconditions;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* @see org.apache.flink.api.common.functions.GroupReduceFunction
*/
@@ -181,7 +181,7 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
}
if(sortColumns.length == 0) { // => all reduce. No comparator
- Preconditions.checkArgument(sortOrderings.length == 0);
+ checkArgument(sortOrderings.length == 0);
} else {
final TypeComparator<IN> sortComparator = getTypeComparator(inputType, sortColumns, sortOrderings, executionConfig);
Collections.sort(inputData, new Comparator<IN>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
index c69c875..15a993c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
@@ -16,16 +16,16 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.operators.util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Internal;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Immutable ordered list of fields IDs.
*/
@@ -44,7 +44,7 @@ public class FieldList extends FieldSet {
}
public FieldList(Integer fieldId) {
- super(Collections.singletonList(Preconditions.checkNotNull(fieldId, "The fields ID must not be null.")));
+ super(Collections.singletonList(checkNotNull(fieldId, "The fields ID must not be null.")));
}
public FieldList(int... columnIndexes) {
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
index 890caac..5d75c95 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
@@ -27,7 +27,8 @@ import java.lang.reflect.Modifier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.NonSerializableUserCodeException;
-import com.google.common.base.Preconditions;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* This holds an actual object containing user defined code.
@@ -39,8 +40,8 @@ public class UserCodeObjectWrapper<T> implements UserCodeWrapper<T> {
private final T userCodeObject;
public UserCodeObjectWrapper(T userCodeObject) {
- Preconditions.checkNotNull(userCodeObject, "The user code object may not be null.");
- Preconditions.checkArgument(userCodeObject instanceof Serializable, "User code object is not serializable: " + userCodeObject.getClass().getName());
+ checkNotNull(userCodeObject, "The user code object may not be null.");
+ checkArgument(userCodeObject instanceof Serializable, "User code object is not serializable: " + userCodeObject.getClass().getName());
this.userCodeObject = userCodeObject;
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
index 2c61fb2..25b2850 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
@@ -22,7 +22,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-import com.google.common.base.Preconditions;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
@@ -31,6 +30,8 @@ import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
@Public
public final class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
@@ -51,11 +52,10 @@ public final class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
private final Class<T> arrayClass;
private final TypeInformation<C> componentInfo;
-
- @SuppressWarnings("unchecked")
+
private BasicArrayTypeInfo(Class<T> arrayClass, BasicTypeInfo<C> componentInfo) {
- this.arrayClass = Preconditions.checkNotNull(arrayClass);
- this.componentInfo = Preconditions.checkNotNull(componentInfo);
+ this.arrayClass = checkNotNull(arrayClass);
+ this.componentInfo = checkNotNull(componentInfo);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index 4eb70c1..c1d5605 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-import com.google.common.base.Preconditions;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
@@ -54,6 +53,7 @@ import org.apache.flink.api.common.typeutils.base.StringComparator;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Type information for primitive types (int, long, double, byte, ...), String, Date, and Void.
@@ -87,9 +87,9 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
protected BasicTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends TypeComparator<T>> comparatorClass) {
- this.clazz = Preconditions.checkNotNull(clazz);
- this.possibleCastTargetTypes = Preconditions.checkNotNull(possibleCastTargetTypes);
- this.serializer = Preconditions.checkNotNull(serializer);
+ this.clazz = checkNotNull(clazz);
+ this.possibleCastTargetTypes = checkNotNull(possibleCastTargetTypes);
+ this.serializer = checkNotNull(serializer);
// comparator can be null as in VOID_TYPE_INFO
this.comparatorClass = comparatorClass;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
index aa22ac6..b4e53d4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
@@ -18,13 +18,14 @@
package org.apache.flink.api.common.typeinfo;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Type information for numeric fractional primitive types (double, float).
@@ -34,15 +35,15 @@ public class FractionalTypeInfo<T> extends NumericTypeInfo<T> {
private static final long serialVersionUID = 554334260950199994L;
- private static final Set<Class<?>> fractionalTypes = Sets.<Class<?>>newHashSet(
- Double.class,
- Float.class
- );
+ private static final HashSet<Class<?>> fractionalTypes = new HashSet<Class<?>>(
+ Arrays.asList(
+ Double.class,
+ Float.class));
protected FractionalTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends TypeComparator<T>> comparatorClass) {
super(clazz, possibleCastTargetTypes, serializer, comparatorClass);
- Preconditions.checkArgument(fractionalTypes.contains(clazz), "The given class " +
- clazz.getSimpleName() + " is not a fractional type.");
+ checkArgument(fractionalTypes.contains(clazz),
+ "The given class %s is not a fractional type.", clazz.getSimpleName());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
index bff3ab7..02b416a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
@@ -18,13 +18,14 @@
package org.apache.flink.api.common.typeinfo;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Type information for numeric integer primitive types: int, long, byte, short, character.
@@ -34,18 +35,18 @@ public class IntegerTypeInfo<T> extends NumericTypeInfo<T> {
private static final long serialVersionUID = -8068827354966766955L;
- private static final Set<Class<?>> integerTypes = Sets.<Class<?>>newHashSet(
- Integer.class,
- Long.class,
- Byte.class,
- Short.class,
- Character.class
- );
+ private static final HashSet<Class<?>> integerTypes = new HashSet<Class<?>>(
+ Arrays.asList(
+ Integer.class,
+ Long.class,
+ Byte.class,
+ Short.class,
+ Character.class));
protected IntegerTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends TypeComparator<T>> comparatorClass) {
super(clazz, possibleCastTargetTypes, serializer, comparatorClass);
- Preconditions.checkArgument(integerTypes.contains(clazz), "The given class " +
- clazz.getSimpleName() + " is not a integer type.");
+ checkArgument(integerTypes.contains(clazz),
+ "The given class %s is not a integer type.", clazz.getSimpleName());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
index 6969520..fea8abe 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
@@ -18,13 +18,14 @@
package org.apache.flink.api.common.typeinfo;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Type information for numeric primitive types: int, long, double, byte, short, float, char.
@@ -34,21 +35,21 @@ public abstract class NumericTypeInfo<T> extends BasicTypeInfo<T> {
private static final long serialVersionUID = -5937777910658986986L;
- private static final Set<Class<?>> numericalTypes = Sets.<Class<?>>newHashSet(
- Integer.class,
- Long.class,
- Double.class,
- Byte.class,
- Short.class,
- Float.class,
- Character.class
- );
+ private static final HashSet<Class<?>> numericalTypes = new HashSet<Class<?>>(
+ Arrays.asList(
+ Integer.class,
+ Long.class,
+ Double.class,
+ Byte.class,
+ Short.class,
+ Float.class,
+ Character.class));
protected NumericTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends
TypeComparator<T>> comparatorClass) {
super(clazz, possibleCastTargetTypes, serializer, comparatorClass);
- Preconditions.checkArgument(numericalTypes.contains(clazz), "The given class " +
- clazz.getSimpleName() + " is not a numerical type.");
+ checkArgument(numericalTypes.contains(clazz),
+ "The given class %s is not a numerical type", clazz.getSimpleName());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
index 2c75458..1c6ce00 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
@@ -18,13 +18,8 @@
package org.apache.flink.api.common.typeinfo;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -46,6 +41,13 @@ import org.apache.flink.api.common.typeutils.base.array.PrimitiveArrayComparator
import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArrayComparator;
import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A {@link TypeInformation} for arrays of primitive types (int, long, double, ...).
* Supports the creation of dedicated efficient serializers for these types.
@@ -83,11 +85,11 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements Ato
* @param comparatorClass The class of the array comparator
*/
private PrimitiveArrayTypeInfo(Class<T> arrayClass, TypeSerializer<T> serializer, Class<? extends PrimitiveArrayComparator<T, ?>> comparatorClass) {
- this.arrayClass = Preconditions.checkNotNull(arrayClass);
- this.serializer = Preconditions.checkNotNull(serializer);
- this.comparatorClass = Preconditions.checkNotNull(comparatorClass);
+ this.arrayClass = checkNotNull(arrayClass);
+ this.serializer = checkNotNull(serializer);
+ this.comparatorClass = checkNotNull(comparatorClass);
- Preconditions.checkArgument(
+ checkArgument(
arrayClass.isArray() && arrayClass.getComponentType().isPrimitive(),
"Class must represent an array of primitives");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index 19b6eaf..4bf17ea 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -22,13 +22,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import com.google.common.base.Preconditions;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Base type information class for Tuple and Pojo types
@@ -44,7 +44,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
@PublicEvolving
public CompositeType(Class<T> typeClass) {
- this.typeClass = Preconditions.checkNotNull(typeClass);
+ this.typeClass = checkNotNull(typeClass);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
index c64e399..21a6ea0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
@@ -22,12 +22,13 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.lang.reflect.Method;
-import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
@Internal
public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> {
@@ -38,7 +39,7 @@ public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> {
private final Class<T> enumClass;
public EnumSerializer(Class<T> enumClass) {
- this.enumClass = Preconditions.checkNotNull(enumClass);
+ this.enumClass = checkNotNull(enumClass);
this.values = createValues(enumClass);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index eb6e708..f35d71b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -21,12 +21,13 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import java.lang.reflect.Array;
-import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A serializer for arrays of objects.
*
@@ -45,8 +46,8 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
public GenericArraySerializer(Class<C> componentClass, TypeSerializer<C> componentSerializer) {
- this.componentClass = Preconditions.checkNotNull(componentClass);
- this.componentSerializer = Preconditions.checkNotNull(componentSerializer);
+ this.componentClass = checkNotNull(componentClass);
+ this.componentSerializer = checkNotNull(componentSerializer);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
index aec3c1d..61d9ae4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
@@ -18,7 +18,7 @@
package org.apache.flink.api.java.typeutils;
-import com.google.common.base.Preconditions;
+import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -27,7 +27,8 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.EnumComparator;
import org.apache.flink.api.common.typeutils.base.EnumSerializer;
-import org.apache.flink.annotation.Public;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A {@link TypeInformation} for java enumeration types.
@@ -43,7 +44,7 @@ public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implemen
@PublicEvolving
public EnumTypeInfo(Class<T> typeClass) {
- Preconditions.checkNotNull(typeClass, "Enum type class must not be null.");
+ checkNotNull(typeClass, "Enum type class must not be null.");
if (!Enum.class.isAssignableFrom(typeClass) ) {
throw new IllegalArgumentException("EnumTypeInfo can only be used for subclasses of " + Enum.class.getName());
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 0cca8bd..bc4e87a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -18,7 +18,6 @@
package org.apache.flink.api.java.typeutils;
-import com.google.common.base.Preconditions;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
@@ -29,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
@Public
public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
@@ -39,7 +39,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
@PublicEvolving
public GenericTypeInfo(Class<T> typeClass) {
- this.typeClass = Preconditions.checkNotNull(typeClass);
+ this.typeClass = checkNotNull(typeClass);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
index 1e8fbe2..2edb3b9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
@@ -27,7 +27,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
-import com.google.common.base.Preconditions;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
@Public
public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
@@ -38,8 +39,8 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
private final TypeInformation<C> componentInfo;
private ObjectArrayTypeInfo(Class<T> arrayType, TypeInformation<C> componentInfo) {
- this.arrayType = Preconditions.checkNotNull(arrayType);
- this.componentInfo = Preconditions.checkNotNull(componentInfo);
+ this.arrayType = checkNotNull(arrayType);
+ this.componentInfo = checkNotNull(componentInfo);
}
// --------------------------------------------------------------------------------------------
@@ -128,9 +129,9 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
@PublicEvolving
public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Class<T> arrayClass, TypeInformation<C> componentInfo) {
- Preconditions.checkNotNull(arrayClass);
- Preconditions.checkNotNull(componentInfo);
- Preconditions.checkArgument(arrayClass.isArray(), "Class " + arrayClass + " must be an array.");
+ checkNotNull(arrayClass);
+ checkNotNull(componentInfo);
+ checkArgument(arrayClass.isArray(), "Class " + arrayClass + " must be an array.");
return new ObjectArrayTypeInfo<T, C>(arrayClass, componentInfo);
}
@@ -146,7 +147,7 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
@SuppressWarnings("unchecked")
@PublicEvolving
public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(TypeInformation<C> componentInfo) {
- Preconditions.checkNotNull(componentInfo);
+ checkNotNull(componentInfo);
return new ObjectArrayTypeInfo<T, C>(
(Class<T>)Array.newInstance(componentInfo.getTypeClass(), 0).getClass(),
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
index c37fc77..026cfa6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
@@ -25,10 +25,11 @@ import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Objects;
-import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Represent a field definition for {@link PojoTypeInfo} type of objects.
*/
@@ -41,8 +42,8 @@ public class PojoField implements Serializable {
private final TypeInformation<?> type;
public PojoField(Field field, TypeInformation<?> type) {
- this.field = Preconditions.checkNotNull(field);
- this.type = Preconditions.checkNotNull(type);
+ this.field = checkNotNull(field);
+ this.type = checkNotNull(type);
}
public Field getField() {
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index be2a027..9c65263 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -18,19 +18,9 @@
package org.apache.flink.api.java.typeutils;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -40,10 +30,20 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
-
-import com.google.common.base.Joiner;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
/**
* TypeInformation for "Java Beans"-style types. Flink refers to them as POJOs,
* since the conditions are slightly different from Java Beans.
@@ -78,8 +78,8 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) {
super(typeClass);
- Preconditions.checkArgument(Modifier.isPublic(typeClass.getModifiers()),
- "POJO " + typeClass + " is not public");
+ checkArgument(Modifier.isPublic(typeClass.getModifiers()),
+ "POJO %s is not public", typeClass);
this.fields = fields.toArray(new PojoField[fields.size()]);
@@ -350,7 +350,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
fieldStrings.add(field.getField().getName() + ": " + field.getTypeInformation().toString());
}
return "PojoType<" + getTypeClass().getName()
- + ", fields = [" + Joiner.on(", ").join(fieldStrings) + "]"
+ + ", fields = [" + StringUtils.join(fieldStrings, ", ") + "]"
+ ">";
}
@@ -381,15 +381,15 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
@Override
public TypeComparator<T> createTypeComparator(ExecutionConfig config) {
- Preconditions.checkState(
+ checkState(
keyFields.size() > 0,
"No keys were defined for the PojoTypeComparatorBuilder.");
- Preconditions.checkState(
+ checkState(
fieldComparators.size() > 0,
"No type comparators were defined for the PojoTypeComparatorBuilder.");
- Preconditions.checkState(
+ checkState(
keyFields.size() == fieldComparators.size(),
"Number of key fields and field comparators is not equal.");
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index 9ecbe73..051ad0df 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -22,24 +22,26 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.*;
import org.apache.flink.api.java.typeutils.runtime.Tuple0Serializer;
-//CHECKSTYLE.ON: AvoidStarImport
import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.types.Value;
+//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
+import org.apache.flink.api.java.tuple.*;
+//CHECKSTYLE.ON: AvoidStarImport
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
/**
* A {@link TypeInformation} for the tuple types of the Java API.
*
@@ -62,7 +64,7 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
public TupleTypeInfo(Class<T> tupleType, TypeInformation<?>... types) {
super(tupleType, types);
- Preconditions.checkArgument(
+ checkArgument(
types.length <= Tuple.MAX_ARITY,
"The tuple type exceeds the maximum supported arity.");
@@ -131,24 +133,24 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
@Override
public TypeComparator<T> createTypeComparator(ExecutionConfig config) {
- Preconditions.checkState(
+ checkState(
fieldComparators.size() > 0,
"No field comparators were defined for the TupleTypeComparatorBuilder."
);
- Preconditions.checkState(
+ checkState(
logicalKeyFields.size() > 0,
"No key fields were defined for the TupleTypeComparatorBuilder."
);
- Preconditions.checkState(
+ checkState(
fieldComparators.size() == logicalKeyFields.size(),
"The number of field comparators and key fields is not equal."
);
final int maxKey = Collections.max(logicalKeyFields);
- Preconditions.checkState(
+ checkState(
maxKey >= 0,
"The maximum key field must be greater or equal than 0."
);
@@ -160,7 +162,7 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
}
return new TupleComparator<T>(
- Ints.toArray(logicalKeyFields),
+ listToPrimitives(logicalKeyFields),
fieldComparators.toArray(new TypeComparator[fieldComparators.size()]),
fieldSerializers
);
@@ -255,4 +257,12 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
return (TupleTypeInfo<X>) new TupleTypeInfo<>(infos);
}
+
+ private static int[] listToPrimitives(ArrayList<Integer> ints) {
+ int[] result = new int[ints.size()];
+ for (int i = 0; i < result.length; i++) {
+ result[i] = ints.get(i);
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index 753eb66..807fd54 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -23,12 +23,12 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import com.google.common.base.Preconditions;
-
import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
private static final long serialVersionUID = 1L;
@@ -52,7 +52,7 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... types) {
super(tupleType);
- this.types = Preconditions.checkNotNull(types);
+ this.types = checkNotNull(types);
int fieldCounter = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 151f359..0469cc2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -66,7 +66,7 @@ import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A utility for reflection analysis on classes, to determine the return type of implementations of transformation
@@ -902,7 +902,7 @@ public class TypeExtractor {
if (isClassType(originalType)) {
originalTypeAsClass = typeToClass(originalType);
}
- Preconditions.checkNotNull(originalTypeAsClass, "originalType has an unexpected type");
+ checkNotNull(originalTypeAsClass, "originalType has an unexpected type");
// check if the class we assumed to conform to the defining type so far is actually a pojo because the
// original type contains additional fields.
// check for additional fields.
@@ -1466,7 +1466,7 @@ public class TypeExtractor {
@SuppressWarnings({ "unchecked", "rawtypes" })
private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
- Preconditions.checkNotNull(clazz);
+ checkNotNull(clazz);
// Object is handled as generic type info
if (clazz.equals(Object.class)) {
@@ -1822,7 +1822,7 @@ public class TypeExtractor {
@SuppressWarnings({ "unchecked", "rawtypes" })
private <X> TypeInformation<X> privateGetForObject(X value) {
- Preconditions.checkNotNull(value);
+ checkNotNull(value);
// check if we can extract the types from tuples, otherwise work with the class
if (value instanceof Tuple) {
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
index 7c173c0..495a324 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
@@ -18,9 +18,8 @@
package org.apache.flink.api.java.typeutils;
-import com.google.common.base.Preconditions;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -44,6 +43,9 @@ import org.apache.flink.types.ShortValue;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Type information for data types that extend the {@link Value} interface. The value
* interface allows types to define their custom serialization and deserialization routines.
@@ -70,11 +72,11 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
@PublicEvolving
public ValueTypeInfo(Class<T> type) {
- this.type = Preconditions.checkNotNull(type);
+ this.type = checkNotNull(type);
- Preconditions.checkArgument(
+ checkArgument(
Value.class.isAssignableFrom(type) || type.equals(Value.class),
- "ValueTypeInfo can only be used for subclasses of " + Value.class.getName());
+ "ValueTypeInfo can only be used for subclasses of %s", Value.class.getName());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
index 5e3b2bc..7ca7a91 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
@@ -18,7 +18,6 @@
package org.apache.flink.api.java.typeutils;
-import com.google.common.base.Preconditions;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
@@ -29,8 +28,12 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
+
import org.apache.hadoop.io.Writable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* Type information for data types that extend Hadoop's {@link Writable} interface. The Writable
* interface defines the serialization and deserialization routines for the data type.
@@ -46,11 +49,11 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
@PublicEvolving
public WritableTypeInfo(Class<T> typeClass) {
- this.typeClass = Preconditions.checkNotNull(typeClass);
+ this.typeClass = checkNotNull(typeClass);
- Preconditions.checkArgument(
+ checkArgument(
Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class),
- "WritableTypeInfo can only be used for subclasses of " + Writable.class.getName());
+ "WritableTypeInfo can only be used for subclasses of %s", Writable.class.getName());
}
@SuppressWarnings({ "rawtypes", "unchecked" })
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index bc04367..4c2a7f9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -20,11 +20,11 @@ package org.apache.flink.api.java.typeutils.runtime;
import java.io.IOException;
-import com.google.common.base.Preconditions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.util.Utf8;
+
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
import org.apache.flink.core.memory.DataInputView;
@@ -34,6 +34,7 @@ import org.apache.flink.util.InstantiationUtil;
import com.esotericsoftware.kryo.Kryo;
import org.objenesis.strategy.StdInstantiatorStrategy;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and
@@ -66,8 +67,8 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
}
public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) {
- this.type = Preconditions.checkNotNull(type);
- this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate);
+ this.type = checkNotNull(type);
+ this.typeToInstantiate = checkNotNull(typeToInstantiate);
InstantiationUtil.checkForInstantiation(typeToInstantiate);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
index 9e46f27..f30a767 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
@@ -20,13 +20,13 @@ package org.apache.flink.api.java.typeutils.runtime;
import java.io.IOException;
-import com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.util.InstantiationUtil;
+import static org.apache.flink.util.Preconditions.checkNotNull;
public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSerializer<T> {
@@ -39,7 +39,7 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
public CopyableValueSerializer(Class<T> valueClass) {
- this.valueClass = Preconditions.checkNotNull(valueClass);
+ this.valueClass = checkNotNull(valueClass);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index de24956..9958540 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-import com.google.common.base.Preconditions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -40,6 +39,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import static org.apache.flink.util.Preconditions.checkNotNull;
public final class PojoSerializer<T> extends TypeSerializer<T> {
@@ -75,11 +75,11 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
Field[] fields,
ExecutionConfig executionConfig) {
- this.clazz = Preconditions.checkNotNull(clazz);
- this.fieldSerializers = (TypeSerializer<Object>[]) Preconditions.checkNotNull(fieldSerializers);
- this.fields = Preconditions.checkNotNull(fields);
+ this.clazz = checkNotNull(clazz);
+ this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
+ this.fields = checkNotNull(fields);
this.numFields = fieldSerializers.length;
- this.executionConfig = Preconditions.checkNotNull(executionConfig);
+ this.executionConfig = checkNotNull(executionConfig);
LinkedHashSet<Class<?>> registeredPojoTypes = executionConfig.getRegisteredPojoTypes();
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index 5b5d462..5a93cc5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -18,7 +18,6 @@
package org.apache.flink.api.java.typeutils.runtime;
-import com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -27,6 +26,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
+import static org.apache.flink.util.Preconditions.checkNotNull;
public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
@@ -42,8 +42,8 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
@SuppressWarnings("unchecked")
public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
- this.tupleClass = Preconditions.checkNotNull(tupleClass);
- this.fieldSerializers = (TypeSerializer<Object>[]) Preconditions.checkNotNull(fieldSerializers);
+ this.tupleClass = checkNotNull(tupleClass);
+ this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
this.arity = fieldSerializers.length;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index 9329866..73dc0fc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.java.typeutils.runtime;
import java.io.IOException;
-import com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -30,6 +29,8 @@ import org.apache.flink.util.InstantiationUtil;
import com.esotericsoftware.kryo.Kryo;
import org.objenesis.strategy.StdInstantiatorStrategy;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Serializer for {@link Value} types. Uses the value's serialization methods, and uses
* Kryo for deep object copies.
@@ -49,7 +50,7 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
// --------------------------------------------------------------------------------------------
public ValueSerializer(Class<T> type) {
- this.type = Preconditions.checkNotNull(type);
+ this.type = checkNotNull(type);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 276ffc4..d5c2f67 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -25,9 +25,9 @@ import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Preconditions;
import org.apache.avro.generic.GenericData;
+
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
@@ -36,7 +36,9 @@ import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+
import org.objenesis.strategy.StdInstantiatorStrategy;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +54,8 @@ import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A type serializer that serializes its type using the Kryo serialization
* framework (https://github.com/EsotericSoftware/kryo).
@@ -92,7 +96,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
// ------------------------------------------------------------------------
public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
- this.type = Preconditions.checkNotNull(type);
+ this.type = checkNotNull(type);
this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses();