You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/04/15 19:39:48 UTC

[03/12] flink git commit: [FLINK-3700] [core] Remove Guava dependency from flink-core

[FLINK-3700] [core] Remove Guava dependency from flink-core

Almost all Guava functionality used within flink-core has corresponding
utils in Flink's codebase, or the JDK library.

This replaces the Guava code as follows
  - Preconditions calls by Flink's Preconditions class
  - Collection utils by simple Java Collection calls
  - Iterator's by Flink's Union Iterator
  - Files by simple util methods arount java.nio.Files
  - InetAddresses IPv6 encoding code has been adapted into Flink's NetUtils (with attribution comments)

Some util classes where moved from flink-runtime to flink-core.

This closes #1854


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/760a0d9e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/760a0d9e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/760a0d9e

Branch: refs/heads/master
Commit: 760a0d9e7fd9fa88e9f7408b137d78df384d764f
Parents: 1b93b32
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 5 15:18:32 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Apr 15 19:38:14 2016 +0200

----------------------------------------------------------------------
 .../storm/tests/StormFieldsGroupingITCase.java  |   2 +-
 flink-core/pom.xml                              |   6 -
 .../java/org/apache/flink/api/common/Plan.java  |   5 +-
 .../org/apache/flink/api/common/TaskInfo.java   |   4 +-
 .../util/AbstractRuntimeUDFContext.java         |   9 +-
 .../api/common/io/DelimitedInputFormat.java     |  17 +-
 .../flink/api/common/io/FileInputFormat.java    |  28 +--
 .../api/common/io/GenericCsvInputFormat.java    |  43 ++--
 .../common/operators/GenericDataSinkBase.java   |  21 +-
 .../apache/flink/api/common/operators/Keys.java |  19 +-
 .../base/GroupCombineOperatorBase.java          |   6 +-
 .../operators/base/GroupReduceOperatorBase.java |   6 +-
 .../api/common/operators/util/FieldList.java    |   6 +-
 .../operators/util/UserCodeObjectWrapper.java   |   7 +-
 .../api/common/typeinfo/BasicArrayTypeInfo.java |  10 +-
 .../api/common/typeinfo/BasicTypeInfo.java      |   8 +-
 .../api/common/typeinfo/FractionalTypeInfo.java |  19 +-
 .../api/common/typeinfo/IntegerTypeInfo.java    |  25 ++-
 .../api/common/typeinfo/NumericTypeInfo.java    |  29 +--
 .../common/typeinfo/PrimitiveArrayTypeInfo.java |  22 +-
 .../api/common/typeutils/CompositeType.java     |   4 +-
 .../common/typeutils/base/EnumSerializer.java   |   5 +-
 .../typeutils/base/GenericArraySerializer.java  |   7 +-
 .../flink/api/java/typeutils/EnumTypeInfo.java  |   7 +-
 .../api/java/typeutils/GenericTypeInfo.java     |   4 +-
 .../api/java/typeutils/ObjectArrayTypeInfo.java |  15 +-
 .../flink/api/java/typeutils/PojoField.java     |   7 +-
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  40 ++--
 .../flink/api/java/typeutils/TupleTypeInfo.java |  32 ++-
 .../api/java/typeutils/TupleTypeInfoBase.java   |   6 +-
 .../flink/api/java/typeutils/TypeExtractor.java |   8 +-
 .../flink/api/java/typeutils/ValueTypeInfo.java |  12 +-
 .../api/java/typeutils/WritableTypeInfo.java    |  11 +-
 .../java/typeutils/runtime/AvroSerializer.java  |   7 +-
 .../runtime/CopyableValueSerializer.java        |   4 +-
 .../java/typeutils/runtime/PojoSerializer.java  |  10 +-
 .../typeutils/runtime/TupleSerializerBase.java  |   6 +-
 .../java/typeutils/runtime/ValueSerializer.java |   5 +-
 .../typeutils/runtime/kryo/KryoSerializer.java  |   8 +-
 .../org/apache/flink/types/StringValue.java     |  14 +-
 .../java/org/apache/flink/util/FileUtils.java   |  91 ++++++++
 .../java/org/apache/flink/util/IOUtils.java     | 223 +++++++++++++++++++
 .../java/org/apache/flink/util/MathUtils.java   | 175 +++++++++++++++
 .../java/org/apache/flink/util/NetUtils.java    |  86 ++++++-
 .../org/apache/flink/util/Preconditions.java    |  57 ++++-
 .../org/apache/flink/util/UnionIterator.java    | 102 +++++++++
 .../org/apache/flink/util/XORShiftRandom.java   |   4 +-
 .../base/OuterJoinOperatorBaseTest.java         |   4 +-
 .../flink/api/java/tuple/TupleGenerator.java    |   7 +-
 .../java/typeutils/PojoTypeExtractionTest.java  |  13 +-
 .../typeutils/runtime/PojoSerializerTest.java   |   8 +-
 .../runtime/PojoSubclassSerializerTest.java     |   5 +-
 .../SubclassFromInterfaceSerializerTest.java    |   5 +-
 .../apache/flink/testutils/junit/RetryRule.java |   5 +-
 .../org/apache/flink/util/MathUtilTest.java     | 102 +++++++++
 .../org/apache/flink/util/NetUtilsTest.java     |   4 -
 .../apache/flink/util/UnionIteratorTest.java    | 143 ++++++++++++
 .../apache/flink/runtime/blob/BlobUtils.java    |   2 +-
 .../flink/runtime/blob/FileSystemBlobStore.java |   2 +-
 .../checkpoint/FileSystemStateStore.java        |   2 +-
 .../flink/runtime/filecache/FileCache.java      |   2 +-
 .../runtime/io/disk/FileChannelInputView.java   |   2 +-
 .../runtime/io/disk/RandomAccessInputView.java  |   2 +-
 .../runtime/io/disk/RandomAccessOutputView.java |   2 +-
 .../io/disk/SeekableFileChannelInputView.java   |   2 +-
 .../io/disk/SimpleCollectingOutputView.java     |   2 +-
 .../flink/runtime/memory/MemoryManager.java     |   2 +-
 .../operators/hash/CompactingHashTable.java     |   2 +-
 .../runtime/operators/hash/HashPartition.java   |   2 +-
 .../operators/hash/MutableHashTable.java        |   2 +-
 .../operators/shipping/OutputEmitter.java       |   2 +-
 .../apache/flink/runtime/util/FileUtils.java    |  61 -----
 .../org/apache/flink/runtime/util/IOUtils.java  | 213 ------------------
 .../apache/flink/runtime/util/MathUtils.java    | 176 ---------------
 .../flink/runtime/util/UnionIterator.java       | 100 ---------
 .../FileSystemStateStorageHelper.java           |   2 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   2 +-
 .../runtime/testutils/CommonTestUtils.java      |   2 +-
 .../apache/flink/runtime/util/MathUtilTest.java | 101 ---------
 .../flink/runtime/util/UnionIteratorTest.java   | 142 ------------
 .../source/SocketTextStreamFunction.java        |   2 +-
 ...ractAlignedProcessingTimeWindowOperator.java |   2 +-
 .../windowing/AccumulatingKeyedTimePanes.java   |   2 +-
 .../runtime/operators/windowing/KeyMap.java     |   2 +-
 .../runtime/partitioner/HashPartitioner.java    |   2 +-
 .../apache/flink/streaming/api/IterateTest.java |   2 +-
 .../streaming/api/StreamingOperatorsITCase.java |   2 +-
 .../api/scala/StreamingOperatorsITCase.scala    |   2 +-
 88 files changed, 1294 insertions(+), 1087 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
index 1875ecb..b43b24d 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
@@ -21,7 +21,7 @@ import backtype.storm.Config;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 import org.apache.flink.storm.api.FlinkLocalCluster;
 import org.apache.flink.storm.api.FlinkTopology;
 import org.apache.flink.storm.tests.operators.FiniteRandomSpout;

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 59b620b..eb55bdd 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -70,12 +70,6 @@ under the License.
 			<artifactId>${shading-artifact.name}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-		
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
 
 		<!-- test depedencies -->
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index 30f2c2f..0235af0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.api.common;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collection;
@@ -43,6 +40,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Visitable;
 import org.apache.flink.util.Visitor;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
 /**
  * This class represents Flink programs, in the form of dataflow plans.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
index 6482cde..ac87e74 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
@@ -20,8 +20,8 @@ package org.apache.flink.api.common;
 
 import org.apache.flink.annotation.Internal;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Encapsulates task-specific information: name, index of subtask, parallelism and attempt number.

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index afe9f77..74b78df 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Future;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
@@ -45,6 +44,8 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.Path;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A standalone implementation of the {@link RuntimeContext}, created by runtime UDF operators.
  */
@@ -66,11 +67,11 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 										ExecutionConfig executionConfig,
 										Map<String, Accumulator<?,?>> accumulators,
 										Map<String, Future<Path>> cpTasks) {
-		this.taskInfo = Preconditions.checkNotNull(taskInfo);
+		this.taskInfo = checkNotNull(taskInfo);
 		this.userCodeClassLoader = userCodeClassLoader;
 		this.executionConfig = executionConfig;
-		this.distributedCache = new DistributedCache(Preconditions.checkNotNull(cpTasks));
-		this.accumulators = Preconditions.checkNotNull(accumulators);
+		this.distributedCache = new DistributedCache(checkNotNull(cpTasks));
+		this.accumulators = checkNotNull(accumulators);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index f6b6d49..243e2a4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -18,12 +18,7 @@
 
 package org.apache.flink.api.common.io;
 
-import java.io.IOException;
-import java.util.ArrayList;
-
 import org.apache.flink.annotation.Public;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -33,7 +28,12 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 
-import com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
 
 /**
  * Base implementation for input formats that split the input at a delimiter into records.
@@ -53,6 +53,9 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	 * The log.
 	 */
 	private static final Logger LOG = LoggerFactory.getLogger(DelimitedInputFormat.class);
+
+	/** The default charset  to convert strings to bytes */
+	private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
 	
 	/**
 	 * The default read buffer size = 1MB.
@@ -185,7 +188,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	}
 	
 	public void setDelimiter(String delimiter) {
-		this.delimiter = delimiter.getBytes(Charsets.UTF_8);
+		this.delimiter = delimiter.getBytes(UTF_8_CHARSET);
 	}
 	
 	public int getLineLengthLimit() {

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index c4cd2b3..fd69cc3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -18,22 +18,10 @@
 
 package org.apache.flink.api.common.io;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory;
 import org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory;
 import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -45,6 +33,20 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The base class for {@link RichInputFormat}s that read from files. For specific input types the
  * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be implemented.
@@ -143,7 +145,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
 	 * @return the extension of the file name or {@code null} if there is no extension.
 	 */
 	protected static String extractFileExtension(String fileName) {
-		Preconditions.checkNotNull(fileName);
+		checkNotNull(fileName);
 		int lastPeriodIndex = fileName.lastIndexOf('.');
 		if (lastPeriodIndex < 0){
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index cb20f81..31d42ff 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -16,13 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.io;
 
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -30,6 +25,7 @@ import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.types.parser.StringParser;
 import org.apache.flink.types.parser.StringValueParser;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,13 +37,20 @@ import java.util.ArrayList;
 import java.util.Map;
 import java.util.TreeMap;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 @Internal
 public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);
-	
 	private static final long serialVersionUID = 1L;
 	
+	
+	private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);
+
+	/** The default charset  to convert strings to bytes */
+	private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
+	
 	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
 	
 	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
@@ -127,7 +130,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 	}
 
 	public void setCommentPrefix(String commentPrefix) {
-		setCommentPrefix(commentPrefix, Charsets.UTF_8);
+		setCommentPrefix(commentPrefix, UTF_8_CHARSET);
 	}
 
 	public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
@@ -171,7 +174,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 	}
 
 	public void setFieldDelimiter(String delimiter) {
-		this.fieldDelim = delimiter.getBytes(Charsets.UTF_8);
+		this.fieldDelim = delimiter.getBytes(UTF_8_CHARSET);
 	}
 
 	public boolean isLenient() {
@@ -247,9 +250,9 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 	}
 	
 	protected void setFieldsGeneric(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
-		Preconditions.checkNotNull(sourceFieldIndices);
-		Preconditions.checkNotNull(fieldTypes);
-		Preconditions.checkArgument(sourceFieldIndices.length == fieldTypes.length,
+		checkNotNull(sourceFieldIndices);
+		checkNotNull(fieldTypes);
+		checkArgument(sourceFieldIndices.length == fieldTypes.length,
 			"Number of field indices and field types must match.");
 
 		for (int i : sourceFieldIndices) {
@@ -258,7 +261,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 			}
 		}
 
-		int largestFieldIndex = Ints.max(sourceFieldIndices);
+		int largestFieldIndex = max(sourceFieldIndices);
 		this.fieldIncluded = new boolean[largestFieldIndex + 1];
 		ArrayList<Class<?>> types = new ArrayList<Class<?>>();
 
@@ -280,8 +283,8 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 	}
 	
 	protected void setFieldsGeneric(boolean[] includedMask, Class<?>[] fieldTypes) {
-		Preconditions.checkNotNull(includedMask);
-		Preconditions.checkNotNull(fieldTypes);
+		checkNotNull(includedMask);
+		checkNotNull(fieldTypes);
 
 		ArrayList<Class<?>> types = new ArrayList<Class<?>>();
 
@@ -530,4 +533,14 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 			lastPos = positions[i];
 		}
 	}
+	
+	private static int max(int[] ints) {
+		checkArgument(ints.length > 0);
+		
+		int max = ints[0];
+		for (int i = 1 ; i < ints.length; i++) {
+			max = Math.max(max, ints[i]);
+		}
+		return max;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
index 33f11f3..1f3875d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators;
 
 import java.util.List;
@@ -39,7 +38,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.types.Nothing;
 import org.apache.flink.util.Visitor;
 
-import com.google.common.base.Preconditions;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Operator for nodes that act as data sinks, storing the data they receive.
@@ -66,7 +65,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
 	public GenericDataSinkBase(OutputFormat<IN> f, UnaryOperatorInformation<IN, Nothing> operatorInfo, String name) {
 		super(operatorInfo, name);
 
-		Preconditions.checkNotNull(f, "The OutputFormat may not be null.");
+		checkNotNull(f, "The OutputFormat may not be null.");
 		this.formatWrapper = new UserCodeObjectWrapper<OutputFormat<IN>>(f);
 	}
 
@@ -79,8 +78,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
 	 */
 	public GenericDataSinkBase(UserCodeWrapper<? extends OutputFormat<IN>> f, UnaryOperatorInformation<IN, Nothing> operatorInfo, String name) {
 		super(operatorInfo, name);
-		Preconditions.checkNotNull(f, "The OutputFormat class may not be null.");
-		this.formatWrapper = f;
+		this.formatWrapper = checkNotNull(f, "The OutputFormat class may not be null.");
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -100,8 +98,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
 	 * @param input The operator to use as the input.
 	 */
 	public void setInput(Operator<IN> input) {
-		Preconditions.checkNotNull(input, "The input may not be null.");
-		this.input = input;
+		this.input = checkNotNull(input, "The input may not be null.");
 	}
 
 	/**
@@ -112,7 +109,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
 	 */
 	@Deprecated
 	public void setInputs(Operator<IN>... inputs) {
-		Preconditions.checkNotNull(inputs, "The inputs may not be null.");
+		checkNotNull(inputs, "The inputs may not be null.");
 		this.input = Operator.createUnionCascade(inputs);
 	}
 
@@ -124,7 +121,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
 	 */
 	@Deprecated
 	public void setInputs(List<Operator<IN>> inputs) {
-		Preconditions.checkNotNull(inputs, "The inputs may not be null.");
+		checkNotNull(inputs, "The inputs may not be null.");
 		this.input = Operator.createUnionCascade(inputs);
 	}
 
@@ -136,7 +133,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
 	 */
 	@Deprecated
 	public void addInput(Operator<IN>... inputs) {
-		Preconditions.checkNotNull(inputs, "The input may not be null.");
+		checkNotNull(inputs, "The input may not be null.");
 		this.input = Operator.createUnionCascade(this.input, inputs);
 	}
 
@@ -149,7 +146,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
 	@SuppressWarnings("unchecked")
 	@Deprecated
 	public void addInputs(List<? extends Operator<IN>> inputs) {
-		Preconditions.checkNotNull(inputs, "The inputs may not be null.");
+		checkNotNull(inputs, "The inputs may not be null.");
 		this.input = createUnionCascade(this.input, (Operator<IN>[]) inputs.toArray(new Operator[inputs.size()]));
 	}
 
@@ -259,7 +256,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
 		format.configure(this.parameters);
 
 		if(format instanceof RichOutputFormat){
-			((RichOutputFormat) format).setRuntimeContext(ctx);
+			((RichOutputFormat<?>) format).setRuntimeContext(ctx);
 		}
 		format.open(0, 1);
 		for (IN element : inputData) {

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
index abe41af..0b771e8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.api.common.operators;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.base.Joiner;
+import org.apache.commons.lang3.StringUtils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -33,7 +30,11 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
-import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 @Internal
 public abstract class Keys<T> {
@@ -232,7 +233,8 @@ public abstract class Keys<T> {
 			} else {
 				rangeCheckFields(keyPositions, type.getArity() - 1);
 			}
-			Preconditions.checkArgument(keyPositions.length > 0, "Grouping fields can not be empty at this point");
+			
+			checkArgument(keyPositions.length > 0, "Grouping fields can not be empty at this point");
 
 			// extract key field types
 			CompositeType<T> cType = (CompositeType<T>)type;
@@ -266,7 +268,7 @@ public abstract class Keys<T> {
 		 * Create String-based (nested) field expression keys on a composite type.
 		 */
 		public ExpressionKeys(String[] keyExpressions, TypeInformation<T> type) {
-			Preconditions.checkNotNull(keyExpressions, "Field expression cannot be null.");
+			checkNotNull(keyExpressions, "Field expression cannot be null.");
 
 			this.keyFields = new ArrayList<>(keyExpressions.length);
 
@@ -375,8 +377,7 @@ public abstract class Keys<T> {
 
 		@Override
 		public String toString() {
-			Joiner join = Joiner.on('.');
-			return "ExpressionKeys: " + join.join(keyFields);
+			return "ExpressionKeys: " + StringUtils.join(keyFields, '.');
 		}
 
 		public static boolean isSortKey(int fieldPos, TypeInformation<?> type) {

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
index b660506..15a0f5a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.ArrayUtils;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -43,6 +43,8 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Base operator for the combineGroup transformation. It receives the UDF GroupCombineFunction as an input.
  * This class is later processed by the compiler to generate the plan.
@@ -109,7 +111,7 @@ public class GroupCombineOperatorBase<IN, OUT, FT extends GroupCombineFunction<I
 		}
 
 		if(sortColumns.length == 0) { // => all reduce. No comparator
-			Preconditions.checkArgument(sortOrderings.length == 0);
+			checkArgument(sortOrderings.length == 0);
 		} else {
 			final TypeComparator<IN> sortComparator = getTypeComparator(inputType, sortColumns, sortOrderings, executionConfig);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index 0cc0209..0794a77 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -41,13 +41,13 @@ import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
-import com.google.common.base.Preconditions;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * @see org.apache.flink.api.common.functions.GroupReduceFunction
  */
@@ -181,7 +181,7 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 		}
 
 		if(sortColumns.length == 0) { // => all reduce. No comparator
-			Preconditions.checkArgument(sortOrderings.length == 0);
+			checkArgument(sortOrderings.length == 0);
 		} else {
 			final TypeComparator<IN> sortComparator = getTypeComparator(inputType, sortColumns, sortOrderings, executionConfig);
 			Collections.sort(inputData, new Comparator<IN>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
index c69c875..15a993c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
@@ -16,16 +16,16 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators.util;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.Internal;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Immutable ordered list of fields IDs.
  */
@@ -44,7 +44,7 @@ public class FieldList extends FieldSet {
 	}
 	
 	public FieldList(Integer fieldId) {
-		super(Collections.singletonList(Preconditions.checkNotNull(fieldId, "The fields ID must not be null.")));
+		super(Collections.singletonList(checkNotNull(fieldId, "The fields ID must not be null.")));
 	}
 	
 	public FieldList(int... columnIndexes) {

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
index 890caac..5d75c95 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
@@ -27,7 +27,8 @@ import java.lang.reflect.Modifier;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.NonSerializableUserCodeException;
 
-import com.google.common.base.Preconditions;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * This holds an actual object containing user defined code.
@@ -39,8 +40,8 @@ public class UserCodeObjectWrapper<T> implements UserCodeWrapper<T> {
 	private final T userCodeObject;
 	
 	public UserCodeObjectWrapper(T userCodeObject) {
-		Preconditions.checkNotNull(userCodeObject, "The user code object may not be null.");
-		Preconditions.checkArgument(userCodeObject instanceof Serializable, "User code object is not serializable: " + userCodeObject.getClass().getName());
+		checkNotNull(userCodeObject, "The user code object may not be null.");
+		checkArgument(userCodeObject instanceof Serializable, "User code object is not serializable: " + userCodeObject.getClass().getName());
 		
 		this.userCodeObject = userCodeObject;
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
index 2c61fb2..25b2850 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
@@ -22,7 +22,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -31,6 +30,8 @@ import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 @Public
 public final class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
 
@@ -51,11 +52,10 @@ public final class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
 
 	private final Class<T> arrayClass;
 	private final TypeInformation<C> componentInfo;
-
-	@SuppressWarnings("unchecked")
+	
 	private BasicArrayTypeInfo(Class<T> arrayClass, BasicTypeInfo<C> componentInfo) {
-		this.arrayClass = Preconditions.checkNotNull(arrayClass);
-		this.componentInfo = Preconditions.checkNotNull(componentInfo);
+		this.arrayClass = checkNotNull(arrayClass);
+		this.componentInfo = checkNotNull(componentInfo);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index 4eb70c1..c1d5605 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -54,6 +53,7 @@ import org.apache.flink.api.common.typeutils.base.StringComparator;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Type information for primitive types (int, long, double, byte, ...), String, Date, and Void.
@@ -87,9 +87,9 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
 	
 	
 	protected BasicTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends TypeComparator<T>> comparatorClass) {
-		this.clazz = Preconditions.checkNotNull(clazz);
-		this.possibleCastTargetTypes = Preconditions.checkNotNull(possibleCastTargetTypes);
-		this.serializer = Preconditions.checkNotNull(serializer);
+		this.clazz = checkNotNull(clazz);
+		this.possibleCastTargetTypes = checkNotNull(possibleCastTargetTypes);
+		this.serializer = checkNotNull(serializer);
 		// comparator can be null as in VOID_TYPE_INFO
 		this.comparatorClass = comparatorClass;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
index aa22ac6..b4e53d4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.api.common.typeinfo;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
-import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Type information for numeric fractional primitive types (double, float).
@@ -34,15 +35,15 @@ public class FractionalTypeInfo<T> extends NumericTypeInfo<T> {
 
 	private static final long serialVersionUID = 554334260950199994L;
 
-	private static final Set<Class<?>> fractionalTypes = Sets.<Class<?>>newHashSet(
-			Double.class,
-			Float.class
-	);
+	private static final HashSet<Class<?>> fractionalTypes = new HashSet<Class<?>>(
+			Arrays.asList(
+				Double.class,
+				Float.class));
 
 	protected FractionalTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends TypeComparator<T>> comparatorClass) {
 		super(clazz, possibleCastTargetTypes, serializer, comparatorClass);
 
-		Preconditions.checkArgument(fractionalTypes.contains(clazz), "The given class " +
-			clazz.getSimpleName() + " is not a fractional type.");
+		checkArgument(fractionalTypes.contains(clazz), 
+				"The given class %s is not a fractional type.", clazz.getSimpleName());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
index bff3ab7..02b416a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.api.common.typeinfo;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
-import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Type information for numeric integer primitive types: int, long, byte, short, character.
@@ -34,18 +35,18 @@ public class IntegerTypeInfo<T> extends NumericTypeInfo<T> {
 
 	private static final long serialVersionUID = -8068827354966766955L;
 
-	private static final Set<Class<?>> integerTypes = Sets.<Class<?>>newHashSet(
-			Integer.class,
-			Long.class,
-			Byte.class,
-			Short.class,
-			Character.class
-	);
+	private static final HashSet<Class<?>> integerTypes = new HashSet<Class<?>>(
+			Arrays.asList(
+				Integer.class,
+				Long.class,
+				Byte.class,
+				Short.class,
+				Character.class));
 
 	protected IntegerTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends TypeComparator<T>> comparatorClass) {
 		super(clazz, possibleCastTargetTypes, serializer, comparatorClass);
 
-		Preconditions.checkArgument(integerTypes.contains(clazz), "The given class " +
-		clazz.getSimpleName() + " is not a integer type.");
+		checkArgument(integerTypes.contains(clazz),
+				"The given class %s is not a integer type.", clazz.getSimpleName());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
index 6969520..fea8abe 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.api.common.typeinfo;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
-import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Type information for numeric primitive types: int, long, double, byte, short, float, char.
@@ -34,21 +35,21 @@ public abstract class NumericTypeInfo<T> extends BasicTypeInfo<T> {
 
 	private static final long serialVersionUID = -5937777910658986986L;
 
-	private static final Set<Class<?>> numericalTypes = Sets.<Class<?>>newHashSet(
-			Integer.class,
-			Long.class,
-			Double.class,
-			Byte.class,
-			Short.class,
-			Float.class,
-			Character.class
-	);
+	private static final HashSet<Class<?>> numericalTypes = new HashSet<Class<?>>(
+			Arrays.asList(
+				Integer.class,
+				Long.class,
+				Double.class,
+				Byte.class,
+				Short.class,
+				Float.class,
+				Character.class));
 
 	protected NumericTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends
 			TypeComparator<T>> comparatorClass) {
 		super(clazz, possibleCastTargetTypes, serializer, comparatorClass);
 
-		Preconditions.checkArgument(numericalTypes.contains(clazz), "The given class " +
-				clazz.getSimpleName() + " is not a numerical type.");
+		checkArgument(numericalTypes.contains(clazz), 
+				"The given class %s is not a numerical type", clazz.getSimpleName());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
index 2c75458..1c6ce00 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
@@ -18,13 +18,8 @@
 
 package org.apache.flink.api.common.typeinfo;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -46,6 +41,13 @@ import org.apache.flink.api.common.typeutils.base.array.PrimitiveArrayComparator
 import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@link TypeInformation} for arrays of primitive types (int, long, double, ...).
  * Supports the creation of dedicated efficient serializers for these types.
@@ -83,11 +85,11 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements Ato
 	 * @param comparatorClass The class of the array comparator
 	 */
 	private PrimitiveArrayTypeInfo(Class<T> arrayClass, TypeSerializer<T> serializer, Class<? extends PrimitiveArrayComparator<T, ?>> comparatorClass) {
-		this.arrayClass = Preconditions.checkNotNull(arrayClass);
-		this.serializer = Preconditions.checkNotNull(serializer);
-		this.comparatorClass = Preconditions.checkNotNull(comparatorClass);
+		this.arrayClass = checkNotNull(arrayClass);
+		this.serializer = checkNotNull(serializer);
+		this.comparatorClass = checkNotNull(comparatorClass);
 
-		Preconditions.checkArgument(
+		checkArgument(
 			arrayClass.isArray() && arrayClass.getComponentType().isPrimitive(),
 			"Class must represent an array of primitives");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index 19b6eaf..4bf17ea 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -22,13 +22,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Base type information class for Tuple and Pojo types
@@ -44,7 +44,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
 
 	@PublicEvolving
 	public CompositeType(Class<T> typeClass) {
-		this.typeClass = Preconditions.checkNotNull(typeClass);
+		this.typeClass = checkNotNull(typeClass);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
index c64e399..21a6ea0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
@@ -22,12 +22,13 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.lang.reflect.Method;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 @Internal
 public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> {
 
@@ -38,7 +39,7 @@ public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> {
 	private final Class<T> enumClass;
 
 	public EnumSerializer(Class<T> enumClass) {
-		this.enumClass = Preconditions.checkNotNull(enumClass);
+		this.enumClass = checkNotNull(enumClass);
 		this.values = createValues(enumClass);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index eb6e708..f35d71b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -21,12 +21,13 @@ package org.apache.flink.api.common.typeutils.base;
 import java.io.IOException;
 import java.lang.reflect.Array;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A serializer for arrays of objects.
  * 
@@ -45,8 +46,8 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 	
 	
 	public GenericArraySerializer(Class<C> componentClass, TypeSerializer<C> componentSerializer) {
-		this.componentClass = Preconditions.checkNotNull(componentClass);
-		this.componentSerializer = Preconditions.checkNotNull(componentSerializer);
+		this.componentClass = checkNotNull(componentClass);
+		this.componentSerializer = checkNotNull(componentSerializer);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
index aec3c1d..61d9ae4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import com.google.common.base.Preconditions;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -27,7 +27,8 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.EnumComparator;
 import org.apache.flink.api.common.typeutils.base.EnumSerializer;
-import org.apache.flink.annotation.Public;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A {@link TypeInformation} for java enumeration types. 
@@ -43,7 +44,7 @@ public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implemen
 
 	@PublicEvolving
 	public EnumTypeInfo(Class<T> typeClass) {
-		Preconditions.checkNotNull(typeClass, "Enum type class must not be null.");
+		checkNotNull(typeClass, "Enum type class must not be null.");
 
 		if (!Enum.class.isAssignableFrom(typeClass) ) {
 			throw new IllegalArgumentException("EnumTypeInfo can only be used for subclasses of " + Enum.class.getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 0cca8bd..bc4e87a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -29,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 @Public
 public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
@@ -39,7 +39,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
 
 	@PublicEvolving
 	public GenericTypeInfo(Class<T> typeClass) {
-		this.typeClass = Preconditions.checkNotNull(typeClass);
+		this.typeClass = checkNotNull(typeClass);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
index 1e8fbe2..2edb3b9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
@@ -27,7 +27,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
 
-import com.google.common.base.Preconditions;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 @Public
 public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
@@ -38,8 +39,8 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
 	private final TypeInformation<C> componentInfo;
 
 	private ObjectArrayTypeInfo(Class<T> arrayType, TypeInformation<C> componentInfo) {
-		this.arrayType = Preconditions.checkNotNull(arrayType);
-		this.componentInfo = Preconditions.checkNotNull(componentInfo);
+		this.arrayType = checkNotNull(arrayType);
+		this.componentInfo = checkNotNull(componentInfo);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -128,9 +129,9 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
 
 	@PublicEvolving
 	public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Class<T> arrayClass, TypeInformation<C> componentInfo) {
-		Preconditions.checkNotNull(arrayClass);
-		Preconditions.checkNotNull(componentInfo);
-		Preconditions.checkArgument(arrayClass.isArray(), "Class " + arrayClass + " must be an array.");
+		checkNotNull(arrayClass);
+		checkNotNull(componentInfo);
+		checkArgument(arrayClass.isArray(), "Class " + arrayClass + " must be an array.");
 
 		return new ObjectArrayTypeInfo<T, C>(arrayClass, componentInfo);
 	}
@@ -146,7 +147,7 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
 	@SuppressWarnings("unchecked")
 	@PublicEvolving
 	public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(TypeInformation<C> componentInfo) {
-		Preconditions.checkNotNull(componentInfo);
+		checkNotNull(componentInfo);
 
 		return new ObjectArrayTypeInfo<T, C>(
 			(Class<T>)Array.newInstance(componentInfo.getTypeClass(), 0).getClass(),

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
index c37fc77..026cfa6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
@@ -25,10 +25,11 @@ import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.util.Objects;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Represent a field definition for {@link PojoTypeInfo} type of objects.
  */
@@ -41,8 +42,8 @@ public class PojoField implements Serializable {
 	private final TypeInformation<?> type;
 
 	public PojoField(Field field, TypeInformation<?> type) {
-		this.field = Preconditions.checkNotNull(field);
-		this.type = Preconditions.checkNotNull(type);
+		this.field = checkNotNull(field);
+		this.type = checkNotNull(type);
 	}
 
 	public Field getField() {

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index be2a027..9c65263 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -18,19 +18,9 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -40,10 +30,20 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
 import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
-
-import com.google.common.base.Joiner;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * TypeInformation for "Java Beans"-style types. Flink refers to them as POJOs,
  * since the conditions are slightly different from Java Beans.
@@ -78,8 +78,8 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 	public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) {
 		super(typeClass);
 
-		Preconditions.checkArgument(Modifier.isPublic(typeClass.getModifiers()),
-				"POJO " + typeClass + " is not public");
+		checkArgument(Modifier.isPublic(typeClass.getModifiers()),
+				"POJO %s is not public", typeClass);
 
 		this.fields = fields.toArray(new PojoField[fields.size()]);
 
@@ -350,7 +350,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 			fieldStrings.add(field.getField().getName() + ": " + field.getTypeInformation().toString());
 		}
 		return "PojoType<" + getTypeClass().getName()
-				+ ", fields = [" + Joiner.on(", ").join(fieldStrings) + "]"
+				+ ", fields = [" + StringUtils.join(fieldStrings, ", ") + "]"
 				+ ">";
 	}
 
@@ -381,15 +381,15 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 
 		@Override
 		public TypeComparator<T> createTypeComparator(ExecutionConfig config) {
-			Preconditions.checkState(
+			checkState(
 				keyFields.size() > 0,
 				"No keys were defined for the PojoTypeComparatorBuilder.");
 
-			Preconditions.checkState(
+			checkState(
 				fieldComparators.size() > 0,
 				"No type comparators were defined for the PojoTypeComparatorBuilder.");
 
-			Preconditions.checkState(
+			checkState(
 				keyFields.size() == fieldComparators.size(),
 				"Number of key fields and field comparators is not equal.");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index 9ecbe73..051ad0df 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -22,24 +22,26 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.*;
 import org.apache.flink.api.java.typeutils.runtime.Tuple0Serializer;
-//CHECKSTYLE.ON: AvoidStarImport
 import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.types.Value;
 
+//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
+import org.apache.flink.api.java.tuple.*;
+//CHECKSTYLE.ON: AvoidStarImport
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A {@link TypeInformation} for the tuple types of the Java API.
  *
@@ -62,7 +64,7 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
 	public TupleTypeInfo(Class<T> tupleType, TypeInformation<?>... types) {
 		super(tupleType, types);
 
-		Preconditions.checkArgument(
+		checkArgument(
 			types.length <= Tuple.MAX_ARITY,
 			"The tuple type exceeds the maximum supported arity.");
 
@@ -131,24 +133,24 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
 
 		@Override
 		public TypeComparator<T> createTypeComparator(ExecutionConfig config) {
-			Preconditions.checkState(
+			checkState(
 				fieldComparators.size() > 0,
 				"No field comparators were defined for the TupleTypeComparatorBuilder."
 			);
 
-			Preconditions.checkState(
+			checkState(
 				logicalKeyFields.size() > 0,
 				"No key fields were defined for the TupleTypeComparatorBuilder."
 			);
 
-			Preconditions.checkState(
+			checkState(
 				fieldComparators.size() == logicalKeyFields.size(),
 				"The number of field comparators and key fields is not equal."
 			);
 
 			final int maxKey = Collections.max(logicalKeyFields);
 
-			Preconditions.checkState(
+			checkState(
 				maxKey >= 0,
 				"The maximum key field must be greater or equal than 0."
 			);
@@ -160,7 +162,7 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
 			}
 
 			return new TupleComparator<T>(
-				Ints.toArray(logicalKeyFields),
+				listToPrimitives(logicalKeyFields),
 				fieldComparators.toArray(new TypeComparator[fieldComparators.size()]),
 				fieldSerializers
 			);
@@ -255,4 +257,12 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
 
 		return (TupleTypeInfo<X>) new TupleTypeInfo<>(infos);
 	}
+	
+	private static int[] listToPrimitives(ArrayList<Integer> ints) {
+		int[] result = new int[ints.size()];
+		for (int i = 0; i < result.length; i++) {
+			result[i] = ints.get(i);
+		}
+		return result;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index 753eb66..807fd54 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -23,12 +23,12 @@ import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import com.google.common.base.Preconditions;
-
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
 
 	private static final long serialVersionUID = 1L;
@@ -52,7 +52,7 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
 	public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... types) {
 		super(tupleType);
 
-		this.types = Preconditions.checkNotNull(types);
+		this.types = checkNotNull(types);
 
 		int fieldCounter = 0;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 151f359..0469cc2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -66,7 +66,7 @@ import org.apache.hadoop.io.Writable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A utility for reflection analysis on classes, to determine the return type of implementations of transformation
@@ -902,7 +902,7 @@ public class TypeExtractor {
 		if (isClassType(originalType)) {
 			originalTypeAsClass = typeToClass(originalType);
 		}
-		Preconditions.checkNotNull(originalTypeAsClass, "originalType has an unexpected type");
+		checkNotNull(originalTypeAsClass, "originalType has an unexpected type");
 		// check if the class we assumed to conform to the defining type so far is actually a pojo because the
 		// original type contains additional fields.
 		// check for additional fields.
@@ -1466,7 +1466,7 @@ public class TypeExtractor {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
 			ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		Preconditions.checkNotNull(clazz);
+		checkNotNull(clazz);
 
 		// Object is handled as generic type info
 		if (clazz.equals(Object.class)) {
@@ -1822,7 +1822,7 @@ public class TypeExtractor {
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	private <X> TypeInformation<X> privateGetForObject(X value) {
-		Preconditions.checkNotNull(value);
+		checkNotNull(value);
 
 		// check if we can extract the types from tuples, otherwise work with the class
 		if (value instanceof Tuple) {

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
index 7c173c0..495a324 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import com.google.common.base.Preconditions;
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -44,6 +43,9 @@ import org.apache.flink.types.ShortValue;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Type information for data types that extend the {@link Value} interface. The value
  * interface allows types to define their custom serialization and deserialization routines.
@@ -70,11 +72,11 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
 
 	@PublicEvolving
 	public ValueTypeInfo(Class<T> type) {
-		this.type = Preconditions.checkNotNull(type);
+		this.type = checkNotNull(type);
 
-		Preconditions.checkArgument(
+		checkArgument(
 			Value.class.isAssignableFrom(type) || type.equals(Value.class),
-			"ValueTypeInfo can only be used for subclasses of " + Value.class.getName());
+			"ValueTypeInfo can only be used for subclasses of %s", Value.class.getName());
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
index 5e3b2bc..7ca7a91 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -29,8 +28,12 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
 import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
+
 import org.apache.hadoop.io.Writable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Type information for data types that extend Hadoop's {@link Writable} interface. The Writable
  * interface defines the serialization and deserialization routines for the data type.
@@ -46,11 +49,11 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
 
 	@PublicEvolving
 	public WritableTypeInfo(Class<T> typeClass) {
-		this.typeClass = Preconditions.checkNotNull(typeClass);
+		this.typeClass = checkNotNull(typeClass);
 
-		Preconditions.checkArgument(
+		checkArgument(
 			Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class),
-			"WritableTypeInfo can only be used for subclasses of " + Writable.class.getName());
+			"WritableTypeInfo can only be used for subclasses of %s", Writable.class.getName());
 	}
 
 	@SuppressWarnings({ "rawtypes", "unchecked" })

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index bc04367..4c2a7f9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -20,11 +20,11 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.IOException;
 
-import com.google.common.base.Preconditions;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.util.Utf8;
+
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.core.memory.DataInputView;
@@ -34,6 +34,7 @@ import org.apache.flink.util.InstantiationUtil;
 import com.esotericsoftware.kryo.Kryo;
 import org.objenesis.strategy.StdInstantiatorStrategy;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and
@@ -66,8 +67,8 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
 	}
 	
 	public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) {
-		this.type = Preconditions.checkNotNull(type);
-		this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate);
+		this.type = checkNotNull(type);
+		this.typeToInstantiate = checkNotNull(typeToInstantiate);
 		
 		InstantiationUtil.checkForInstantiation(typeToInstantiate);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
index 9e46f27..f30a767 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
@@ -20,13 +20,13 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.IOException;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.InstantiationUtil;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSerializer<T> {
 
@@ -39,7 +39,7 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
 	
 	
 	public CopyableValueSerializer(Class<T> valueClass) {
-		this.valueClass = Preconditions.checkNotNull(valueClass);
+		this.valueClass = checkNotNull(valueClass);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index de24956..9958540 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -40,6 +39,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 public final class PojoSerializer<T> extends TypeSerializer<T> {
 
@@ -75,11 +75,11 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 			Field[] fields,
 			ExecutionConfig executionConfig) {
 
-		this.clazz = Preconditions.checkNotNull(clazz);
-		this.fieldSerializers = (TypeSerializer<Object>[]) Preconditions.checkNotNull(fieldSerializers);
-		this.fields = Preconditions.checkNotNull(fields);
+		this.clazz = checkNotNull(clazz);
+		this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
+		this.fields = checkNotNull(fields);
 		this.numFields = fieldSerializers.length;
-		this.executionConfig = Preconditions.checkNotNull(executionConfig);
+		this.executionConfig = checkNotNull(executionConfig);
 
 		LinkedHashSet<Class<?>> registeredPojoTypes = executionConfig.getRegisteredPojoTypes();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index 5b5d462..5a93cc5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -27,6 +26,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 
@@ -42,8 +42,8 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 
 	@SuppressWarnings("unchecked")
 	public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
-		this.tupleClass = Preconditions.checkNotNull(tupleClass);
-		this.fieldSerializers = (TypeSerializer<Object>[]) Preconditions.checkNotNull(fieldSerializers);
+		this.tupleClass = checkNotNull(tupleClass);
+		this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
 		this.arity = fieldSerializers.length;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index 9329866..73dc0fc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.IOException;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -30,6 +29,8 @@ import org.apache.flink.util.InstantiationUtil;
 import com.esotericsoftware.kryo.Kryo;
 import org.objenesis.strategy.StdInstantiatorStrategy;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Serializer for {@link Value} types. Uses the value's serialization methods, and uses
  * Kryo for deep object copies.
@@ -49,7 +50,7 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 	// --------------------------------------------------------------------------------------------
 	
 	public ValueSerializer(Class<T> type) {
-		this.type = Preconditions.checkNotNull(type);
+		this.type = checkNotNull(type);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 276ffc4..d5c2f67 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -25,9 +25,9 @@ import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Preconditions;
 
 import org.apache.avro.generic.GenericData;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
@@ -36,7 +36,9 @@ import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+
 import org.objenesis.strategy.StdInstantiatorStrategy;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +54,8 @@ import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A type serializer that serializes its type using the Kryo serialization
  * framework (https://github.com/EsotericSoftware/kryo).
@@ -92,7 +96,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	// ------------------------------------------------------------------------
 
 	public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
-		this.type = Preconditions.checkNotNull(type);
+		this.type = checkNotNull(type);
 
 		this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
 		this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses();