You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/09/07 01:01:07 UTC

[1/2] git commit: [FLINK-629] getFieldNotNull added to Tuple and updated Aggregators and Comparators to use that where appropriate

Repository: incubator-flink
Updated Branches:
  refs/heads/master 679bdc17c -> 66c1263de


[FLINK-629] getFieldNotNull added to Tuple and updated Aggregators and Comparators to use that where appropriate


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/761952c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/761952c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/761952c0

Branch: refs/heads/master
Commit: 761952c04084e3819d763bbcfb564ec69933dd92
Parents: 679bdc1
Author: gyfora <gy...@gmail.com>
Authored: Wed Jul 16 19:09:04 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Sep 6 23:08:06 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/types/NullFieldException.java  | 74 ++++++++++++++++++++
 .../api/java/operators/AggregateOperator.java   | 11 ++-
 .../translation/TupleKeyExtractingMapper.java   | 14 ++--
 .../org/apache/flink/api/java/tuple/Tuple.java  | 19 +++++
 .../java/typeutils/runtime/TupleComparator.java | 24 ++-----
 .../runtime/TupleLeadingFieldComparator.java    | 10 +--
 .../TupleLeadingFieldPairComparator.java        |  6 +-
 .../typeutils/runtime/TuplePairComparator.java  |  6 +-
 .../apache/flink/api/java/tuple/Tuple2Test.java | 16 ++++-
 9 files changed, 144 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
new file mode 100755
index 0000000..085660d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types;
+
+
+/**
+ * An exception specifying that a required field was not set in a record, i.e. was <code>null</code>.
+ */
+public class NullFieldException extends RuntimeException
+{
+	/**
+	 * UID for serialization interoperability. 
+	 */
+	private static final long serialVersionUID = -8820467525772321173L;
+	
+	private final int fieldNumber;
+
+	/**
+	 * Constructs an {@code NullFieldException} with {@code null}
+	 * as its error detail message.
+	 */
+	public NullFieldException() {
+		super();
+		this.fieldNumber = -1;
+	}
+
+	/**
+	 * Constructs an {@code NullFieldException} with the specified detail message.
+	 *
+	 * @param message The detail message.
+	 */
+	public NullFieldException(String message) {
+		super(message);
+		this.fieldNumber = -1;
+	}
+	
+	/**
+	 * Constructs an {@code NullFieldException} with a default message, referring to
+	 * given field number as the null field.
+	 *
+	 * @param fieldNumber The index of the field that was null, bit expected to hold a value.
+	 */
+	public NullFieldException(int fieldNumber) {
+		super("Field " + fieldNumber + " is null, but expected to hold a value.");
+		this.fieldNumber = fieldNumber;
+	}
+	
+	/**
+	 * Gets the field number that was attempted to access. If the number is not set, this method returns
+	 * {@code -1}.
+	 * 
+	 * @return The field number that was attempted to access, or {@code -1}, if not set.
+	 */
+	public int getFieldNumber() {
+		return this.fieldNumber;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
index 1ceb8c8..6073a1a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
@@ -37,8 +37,9 @@ import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.NullFieldException;
+import org.apache.flink.types.NullKeyFieldException;
 import org.apache.flink.util.Collector;
-
 import org.apache.flink.api.java.DataSet;
 
 /**
@@ -283,8 +284,12 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
 				current = values.next();
 				
 				for (int i = 0; i < fieldPositions.length; i++) {
-					Object val = current.getField(fieldPositions[i]);
-					aggFunctions[i].aggregate(val);
+					try {
+						Object val = current.getFieldNotNull(fieldPositions[i]);
+						aggFunctions[i].aggregate(val);
+					} catch (NullKeyFieldException e) {
+						throw new NullFieldException(fieldPositions[i]);
+					}
 				}
 			}
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
index ecac775..97e67ca 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
@@ -21,6 +21,8 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.NullFieldException;
+import org.apache.flink.types.NullKeyFieldException;
 
 
 public final class TupleKeyExtractingMapper<T, K> extends RichMapFunction<T, Tuple2<K, T>> {
@@ -42,10 +44,14 @@ public final class TupleKeyExtractingMapper<T, K> extends RichMapFunction<T, Tup
 		
 		Tuple v = (Tuple) value;
 		
-		K key = v.getField(pos);
-		tuple.f0 = key;
-		tuple.f1 = value;
-		
+		try {
+			K key = v.getFieldNotNull(pos);
+			tuple.f0 = key;
+			tuple.f1 = value;
+		} catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
+
 		return tuple;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
index 4738b02..2966830 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.api.java.tuple;
 
+import org.apache.flink.types.NullKeyFieldException;
+
 /**
  * The base class of all tuples. Tuples have a fix length and contain a set of fields,
  * which may all be of different types. Because Tuples are strongly typed, each distinct
@@ -47,6 +49,23 @@ public abstract class Tuple implements java.io.Serializable {
 	public abstract <T> T getField(int pos);
 	
 	/**
+	 * Gets the field at the specified position, throws NullKeyFieldException if the field is null. Used for comparing key fields.
+	 * 
+	 * @param pos The position of the field, zero indexed. 
+	 * @returnThe field at the specified position.
+	 * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
+	 * @throws NullKeyFieldException Thrown, if the field at pos is null.
+	 */
+	public <T> T getFieldNotNull(int pos){
+		T field = getField(pos);
+		if (field != null) {
+			return field;
+		} else {
+			throw new NullKeyFieldException(pos);
+		}
+	}
+
+	/**
 	 * Sets the field at the specified position.
 	 *
 	 * @param value The value to be assigned to the field at the specified position.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
index e7dd25a..4472949 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -156,17 +156,14 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
 	public int hash(T value) {
 		int i = 0;
 		try {
-			int code = this.comparators[0].hash(value.getField(keyPositions[0]));
+			int code = this.comparators[0].hash(value.getFieldNotNull(keyPositions[0]));
 			
 			for (i = 1; i < this.keyPositions.length; i++) {
 				code *= HASH_SALT[i & 0x1F]; // salt code with (i % HASH_SALT.length)-th salt component
-				code += this.comparators[i].hash(value.getField(keyPositions[i]));
+				code += this.comparators[i].hash(value.getFieldNotNull(keyPositions[i]));
 			}
 			return code;
 		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(keyPositions[i]);
-		}
 		catch (IndexOutOfBoundsException iobex) {
 			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
 		}
@@ -177,12 +174,9 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
 		int i = 0;
 		try {
 			for (; i < this.keyPositions.length; i++) {
-				this.comparators[i].setReference(toCompare.getField(this.keyPositions[i]));
+				this.comparators[i].setReference(toCompare.getFieldNotNull(this.keyPositions[i]));
 			}
 		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(keyPositions[i]);
-		}
 		catch (IndexOutOfBoundsException iobex) {
 			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
 		}
@@ -193,15 +187,12 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
 		int i = 0;
 		try {
 			for (; i < this.keyPositions.length; i++) {
-				if (!this.comparators[i].equalToReference(candidate.getField(this.keyPositions[i]))) {
+				if (!this.comparators[i].equalToReference(candidate.getFieldNotNull(this.keyPositions[i]))) {
 					return false;
 				}
 			}
 			return true;
 		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(keyPositions[i]);
-		}
 		catch (IndexOutOfBoundsException iobex) {
 			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
 		}
@@ -236,15 +227,14 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
 			for (; i < keyPositions.length; i++) {
 				int keyPos = keyPositions[i];
 				@SuppressWarnings("unchecked")
-				int cmp = comparators[i].compare((T)first.getField(keyPos), (T)second.getField(keyPos));
+				int cmp = comparators[i].compare((T)first.getFieldNotNull(keyPos), (T)second.getFieldNotNull(keyPos));
+
 				if (cmp != 0) {
 					return cmp;
 				}
 			}
 			
 			return 0;
-		} catch (NullPointerException npex) {
-			throw new NullKeyFieldException(keyPositions[i]);
 		} catch (IndexOutOfBoundsException iobex) {
 			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
 		}
@@ -304,7 +294,7 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
 			{
 				int len = this.normalizedKeyLengths[i]; 
 				len = numBytes >= len ? len : numBytes;
-				this.comparators[i].putNormalizedKey(value.getField(this.keyPositions[i]), target, offset, len);
+				this.comparators[i].putNormalizedKey(value.getFieldNotNull(this.keyPositions[i]), target, offset, len);
 				numBytes -= len;
 				offset += len;
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
index 43906f8..7a9aa43 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
@@ -46,18 +46,18 @@ public final class TupleLeadingFieldComparator<T extends Tuple, K> extends TypeC
 	
 	@Override
 	public int hash(T value) {
-		return comparator.hash(value.<K>getField(0));
+		return comparator.hash(value.<K>getFieldNotNull(0));
 		
 	}
 
 	@Override
 	public void setReference(T toCompare) {
-		this.comparator.setReference(toCompare.<K>getField(0));
+		this.comparator.setReference(toCompare.<K>getFieldNotNull(0));
 	}
 
 	@Override
 	public boolean equalToReference(T candidate) {
-		return this.comparator.equalToReference(candidate.<K>getField(0));
+		return this.comparator.equalToReference(candidate.<K>getFieldNotNull(0));
 	}
 
 	@SuppressWarnings("unchecked")
@@ -68,7 +68,7 @@ public final class TupleLeadingFieldComparator<T extends Tuple, K> extends TypeC
 	
 	@Override
 	public int compare(T first, T second) {
-		return this.comparator.compare(first.<K>getField(0), second.<K>getField(0));
+		return this.comparator.compare(first.<K>getFieldNotNull(0), second.<K>getFieldNotNull(0));
 	}
 
 	@Override
@@ -98,7 +98,7 @@ public final class TupleLeadingFieldComparator<T extends Tuple, K> extends TypeC
 
 	@Override
 	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
-		this.comparator.putNormalizedKey(record.<K>getField(0), target, offset, numBytes);
+		this.comparator.putNormalizedKey(record.<K>getFieldNotNull(0), target, offset, numBytes);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
index cb7eef5..749c38d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
@@ -39,17 +39,17 @@ public class TupleLeadingFieldPairComparator<K, T1 extends Tuple, T2 extends Tup
 	
 	@Override
 	public void setReference(T1 reference) {
-		this.comparator1.setReference(reference.<K>getField(0));
+		this.comparator1.setReference(reference.<K>getFieldNotNull(0));
 	}
 
 	@Override
 	public boolean equalToReference(T2 candidate) {
-		return this.comparator1.equalToReference(candidate.<K>getField(0));
+		return this.comparator1.equalToReference(candidate.<K>getFieldNotNull(0));
 	}
 
 	@Override
 	public int compareToReference(T2 candidate) {
-		this.comparator2.setReference(candidate.<K>getField(0));
+		this.comparator2.setReference(candidate.<K>getFieldNotNull(0));
 		return this.comparator1.compareToReference(this.comparator2);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
index 796799b..43f46e4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
@@ -59,14 +59,14 @@ public class TuplePairComparator<T1 extends Tuple, T2 extends Tuple> extends Typ
 	@Override
 	public void setReference(T1 reference) {
 		for (int i = 0; i < this.comparators1.length; i++) {
-			this.comparators1[i].setReference(reference.getField(keyFields1[i]));
+			this.comparators1[i].setReference(reference.getFieldNotNull(keyFields1[i]));
 		}
 	}
 
 	@Override
 	public boolean equalToReference(T2 candidate) {
 		for (int i = 0; i < this.comparators1.length; i++) {
-			if (!this.comparators1[i].equalToReference(candidate.getField(keyFields2[i]))) {
+			if (!this.comparators1[i].equalToReference(candidate.getFieldNotNull(keyFields2[i]))) {
 				return false;
 			}
 		}
@@ -76,7 +76,7 @@ public class TuplePairComparator<T1 extends Tuple, T2 extends Tuple> extends Typ
 	@Override
 	public int compareToReference(T2 candidate) {
 		for (int i = 0; i < this.comparators1.length; i++) {
-			this.comparators2[i].setReference(candidate.getField(keyFields2[i]));
+			this.comparators2[i].setReference(candidate.getFieldNotNull(keyFields2[i]));
 			int res = this.comparators1[i].compareToReference(this.comparators2[i]);
 			if(res != 0) {
 				return res;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
index 7379a88..caf98fd 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
@@ -21,7 +21,7 @@ package org.apache.flink.api.java.tuple;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.NullKeyFieldException;
 import org.junit.Test;
 
 public class Tuple2Test {
@@ -35,4 +35,18 @@ public class Tuple2Test {
 
 		Assert.assertEquals(swapped.f1, toSwap.f0);
 	}
+	
+	@Test
+	public void testGetFieldNotNull() {
+		Tuple2<String, Integer> tuple = new Tuple2<String, Integer>(new String("Test case"), null);
+
+		Assert.assertEquals("Test case", tuple.getFieldNotNull(0));
+
+		try {
+			tuple.getFieldNotNull(1);
+			Assert.fail();
+		} catch (NullKeyFieldException e) {
+			// right
+		}
+	}
 }


[2/2] git commit: [FLINK-629] Updated getFieldNotNull usage and added it to TupleSerializers

Posted by fh...@apache.org.
[FLINK-629] Updated getFieldNotNull usage and added it to TupleSerializers

This closes #73


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/66c1263d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/66c1263d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/66c1263d

Branch: refs/heads/master
Commit: 66c1263dee5fdaa6436cc2d590f34d63747d6ca1
Parents: 761952c
Author: mbalassi <ba...@gmail.com>
Authored: Thu Jul 17 09:49:53 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sun Sep 7 00:59:57 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/types/NullFieldException.java  | 20 +++++------
 .../flink/types/NullKeyFieldException.java      | 11 ++++++
 .../api/java/functions/SelectByMaxFunction.java |  4 +--
 .../api/java/functions/SelectByMinFunction.java |  4 +--
 .../api/java/operators/AggregateOperator.java   |  8 +----
 .../org/apache/flink/api/java/tuple/Tuple.java  |  9 ++---
 .../java/typeutils/runtime/TupleComparator.java | 20 +++++++++--
 .../runtime/TupleLeadingFieldComparator.java    | 38 ++++++++++++++++----
 .../TupleLeadingFieldPairComparator.java        | 23 +++++++++---
 .../typeutils/runtime/TuplePairComparator.java  | 32 +++++++++++++----
 .../java/typeutils/runtime/TupleSerializer.java |  7 +++-
 .../apache/flink/api/java/tuple/Tuple2Test.java | 13 ++-----
 12 files changed, 133 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
index 085660d..6948579 100755
--- a/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
+++ b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.types;
 
-
 /**
  * An exception specifying that a required field was not set in a record, i.e. was <code>null</code>.
  */
@@ -30,7 +28,7 @@ public class NullFieldException extends RuntimeException
 	 */
 	private static final long serialVersionUID = -8820467525772321173L;
 	
-	private final int fieldNumber;
+	private final int fieldPos;
 
 	/**
 	 * Constructs an {@code NullFieldException} with {@code null}
@@ -38,7 +36,7 @@ public class NullFieldException extends RuntimeException
 	 */
 	public NullFieldException() {
 		super();
-		this.fieldNumber = -1;
+		this.fieldPos = -1;
 	}
 
 	/**
@@ -48,18 +46,18 @@ public class NullFieldException extends RuntimeException
 	 */
 	public NullFieldException(String message) {
 		super(message);
-		this.fieldNumber = -1;
+		this.fieldPos = -1;
 	}
 	
 	/**
 	 * Constructs an {@code NullFieldException} with a default message, referring to
 	 * given field number as the null field.
 	 *
-	 * @param fieldNumber The index of the field that was null, bit expected to hold a value.
+	 * @param fieldIdx The index of the field that was null, but expected to hold a value.
 	 */
-	public NullFieldException(int fieldNumber) {
-		super("Field " + fieldNumber + " is null, but expected to hold a value.");
-		this.fieldNumber = fieldNumber;
+	public NullFieldException(int fieldIdx) {
+		super("Field " + fieldIdx + " is null, but expected to hold a value.");
+		this.fieldPos = fieldIdx;
 	}
 	
 	/**
@@ -68,7 +66,7 @@ public class NullFieldException extends RuntimeException
 	 * 
 	 * @return The field number that was attempted to access, or {@code -1}, if not set.
 	 */
-	public int getFieldNumber() {
-		return this.fieldNumber;
+	public int getFieldPos() {
+		return this.fieldPos;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-core/src/main/java/org/apache/flink/types/NullKeyFieldException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/NullKeyFieldException.java b/flink-core/src/main/java/org/apache/flink/types/NullKeyFieldException.java
index 8f6d5c6..1eddfd9 100644
--- a/flink-core/src/main/java/org/apache/flink/types/NullKeyFieldException.java
+++ b/flink-core/src/main/java/org/apache/flink/types/NullKeyFieldException.java
@@ -42,6 +42,17 @@ public class NullKeyFieldException extends RuntimeException
 	}
 
 	/**
+	 * Constructs an {@code NullKeyFieldException} with a default message, referring to
+	 * the field number given in the {@code NullFieldException}.
+	 *
+	 * @param nfex The base exception.
+	 */
+	public NullKeyFieldException(NullFieldException nfex) {
+		super();
+		this.fieldNumber = nfex.getFieldPos();
+	}
+	
+	/**
 	 * Constructs an {@code NullKeyFieldException} with the specified detail message.
 	 *
 	 * @param message The detail message.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
index 614676e..aac59ae 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
@@ -73,8 +73,8 @@ public class SelectByMaxFunction<T extends Tuple> implements ReduceFunction<T> {
 			int position = this.fields[index];
 
 			// Get both values - both implement comparable
-			Comparable comparable1 = value1.getField(position);
-			Comparable comparable2 = value2.getField(position);
+			Comparable comparable1 = value1.getFieldNotNull(position);
+			Comparable comparable2 = value2.getFieldNotNull(position);
 
 			// Compare values
 			int comp = comparable1.compareTo(comparable2);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
index 4b0a7bf..011f8f6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
@@ -73,8 +73,8 @@ public class SelectByMinFunction<T extends Tuple> implements ReduceFunction<T> {
 			int position = this.fields[index];
 
 			// Get both values - both implement comparable
-			Comparable comparable1 = value1.getField(position);
-			Comparable comparable2 = value2.getField(position);
+			Comparable comparable1 = value1.getFieldNotNull(position);
+			Comparable comparable2 = value2.getFieldNotNull(position);
 
 			// Compare values
 			int comp = comparable1.compareTo(comparable2);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
index 6073a1a..d1fed5e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.aggregation.AggregationFunction;
 import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
 import org.apache.flink.api.java.aggregation.Aggregations;
@@ -37,10 +38,7 @@ import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.NullFieldException;
-import org.apache.flink.types.NullKeyFieldException;
 import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
 
 /**
  * This operator represents the application of a "aggregate" operation on a data set, and the
@@ -284,12 +282,8 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
 				current = values.next();
 				
 				for (int i = 0; i < fieldPositions.length; i++) {
-					try {
 						Object val = current.getFieldNotNull(fieldPositions[i]);
 						aggFunctions[i].aggregate(val);
-					} catch (NullKeyFieldException e) {
-						throw new NullFieldException(fieldPositions[i]);
-					}
 				}
 			}
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
index 2966830..bc913f1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.api.java.tuple;
 
-import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.types.NullFieldException;
+
 
 /**
  * The base class of all tuples. Tuples have a fix length and contain a set of fields,
@@ -49,19 +50,19 @@ public abstract class Tuple implements java.io.Serializable {
 	public abstract <T> T getField(int pos);
 	
 	/**
-	 * Gets the field at the specified position, throws NullKeyFieldException if the field is null. Used for comparing key fields.
+	 * Gets the field at the specified position, throws NullFieldException if the field is null. Used for comparing key fields.
 	 * 
 	 * @param pos The position of the field, zero indexed. 
 	 * @returnThe field at the specified position.
 	 * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
-	 * @throws NullKeyFieldException Thrown, if the field at pos is null.
+	 * @throws NullFieldException Thrown, if the field at pos is null.
 	 */
 	public <T> T getFieldNotNull(int pos){
 		T field = getField(pos);
 		if (field != null) {
 			return field;
 		} else {
-			throw new NullKeyFieldException(pos);
+			throw new NullFieldException(pos);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
index 4472949..48cf08b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.NullFieldException;
 import org.apache.flink.types.NullKeyFieldException;
 
 
@@ -164,6 +165,9 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
 			}
 			return code;
 		}
+		catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
 		catch (IndexOutOfBoundsException iobex) {
 			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
 		}
@@ -177,6 +181,9 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
 				this.comparators[i].setReference(toCompare.getFieldNotNull(this.keyPositions[i]));
 			}
 		}
+		catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
 		catch (IndexOutOfBoundsException iobex) {
 			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
 		}
@@ -193,6 +200,9 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
 			}
 			return true;
 		}
+		catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
 		catch (IndexOutOfBoundsException iobex) {
 			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
 		}
@@ -233,9 +243,12 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
 					return cmp;
 				}
 			}
-			
 			return 0;
-		} catch (IndexOutOfBoundsException iobex) {
+		} 
+		catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
+		catch (IndexOutOfBoundsException iobex) {
 			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
 		}
 	}
@@ -299,6 +312,9 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
 				offset += len;
 			}
 		}
+		catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
 		catch (NullPointerException npex) {
 			throw new NullKeyFieldException(this.keyPositions[i]);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
index 7a9aa43..d63fccb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NullFieldException;
+import org.apache.flink.types.NullKeyFieldException;
 
 
 public final class TupleLeadingFieldComparator<T extends Tuple, K> extends TypeComparator<T>
@@ -46,18 +48,32 @@ public final class TupleLeadingFieldComparator<T extends Tuple, K> extends TypeC
 	
 	@Override
 	public int hash(T value) {
-		return comparator.hash(value.<K>getFieldNotNull(0));
-		
+		try {
+			return comparator.hash(value.<K> getFieldNotNull(0));
+		} catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
+
 	}
 
 	@Override
 	public void setReference(T toCompare) {
-		this.comparator.setReference(toCompare.<K>getFieldNotNull(0));
+		try {
+			this.comparator.setReference(toCompare.<K> getFieldNotNull(0));
+		} catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
 	}
+	
 
 	@Override
 	public boolean equalToReference(T candidate) {
-		return this.comparator.equalToReference(candidate.<K>getFieldNotNull(0));
+		try {
+			return this.comparator.equalToReference(candidate
+					.<K> getFieldNotNull(0));
+		} catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
 	}
 
 	@SuppressWarnings("unchecked")
@@ -68,7 +84,12 @@ public final class TupleLeadingFieldComparator<T extends Tuple, K> extends TypeC
 	
 	@Override
 	public int compare(T first, T second) {
-		return this.comparator.compare(first.<K>getFieldNotNull(0), second.<K>getFieldNotNull(0));
+		try {
+			return this.comparator.compare(first.<K> getFieldNotNull(0),
+					second.<K> getFieldNotNull(0));
+		} catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
 	}
 
 	@Override
@@ -98,7 +119,12 @@ public final class TupleLeadingFieldComparator<T extends Tuple, K> extends TypeC
 
 	@Override
 	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
-		this.comparator.putNormalizedKey(record.<K>getFieldNotNull(0), target, offset, numBytes);
+		try {
+			this.comparator.putNormalizedKey(record.<K> getFieldNotNull(0),
+					target, offset, numBytes);
+		} catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
index 749c38d..3611f70 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
@@ -23,6 +23,8 @@ import java.io.Serializable;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.types.NullFieldException;
+import org.apache.flink.types.NullKeyFieldException;
 
 
 public class TupleLeadingFieldPairComparator<K, T1 extends Tuple, T2 extends Tuple> extends TypePairComparator<T1, T2> implements Serializable {
@@ -39,17 +41,30 @@ public class TupleLeadingFieldPairComparator<K, T1 extends Tuple, T2 extends Tup
 	
 	@Override
 	public void setReference(T1 reference) {
-		this.comparator1.setReference(reference.<K>getFieldNotNull(0));
+		try {
+			this.comparator1.setReference(reference.<K> getFieldNotNull(0));
+		} catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
 	}
 
 	@Override
 	public boolean equalToReference(T2 candidate) {
-		return this.comparator1.equalToReference(candidate.<K>getFieldNotNull(0));
+		try {
+			return this.comparator1.equalToReference(candidate
+					.<K> getFieldNotNull(0));
+		} catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
 	}
 
 	@Override
 	public int compareToReference(T2 candidate) {
-		this.comparator2.setReference(candidate.<K>getFieldNotNull(0));
-		return this.comparator1.compareToReference(this.comparator2);
+		try {
+			this.comparator2.setReference(candidate.<K> getFieldNotNull(0));
+			return this.comparator1.compareToReference(this.comparator2);
+		} catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
index 43f46e4..4c4094d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
@@ -23,6 +23,8 @@ import java.io.Serializable;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.types.NullFieldException;
+import org.apache.flink.types.NullKeyFieldException;
 
 
 public class TuplePairComparator<T1 extends Tuple, T2 extends Tuple> extends TypePairComparator<T1, T2> implements Serializable {
@@ -59,15 +61,25 @@ public class TuplePairComparator<T1 extends Tuple, T2 extends Tuple> extends Typ
 	@Override
 	public void setReference(T1 reference) {
 		for (int i = 0; i < this.comparators1.length; i++) {
-			this.comparators1[i].setReference(reference.getFieldNotNull(keyFields1[i]));
+			try {
+				this.comparators1[i].setReference(reference
+						.getFieldNotNull(keyFields1[i]));
+			} catch (NullFieldException nfex) {
+				throw new NullKeyFieldException(nfex);
+			}
 		}
 	}
 
 	@Override
 	public boolean equalToReference(T2 candidate) {
 		for (int i = 0; i < this.comparators1.length; i++) {
-			if (!this.comparators1[i].equalToReference(candidate.getFieldNotNull(keyFields2[i]))) {
-				return false;
+			try {
+				if (!this.comparators1[i].equalToReference(candidate
+						.getFieldNotNull(keyFields2[i]))) {
+					return false;
+				}
+			} catch (NullFieldException nfex) {
+				throw new NullKeyFieldException(nfex);
 			}
 		}
 		return true;
@@ -76,10 +88,16 @@ public class TuplePairComparator<T1 extends Tuple, T2 extends Tuple> extends Typ
 	@Override
 	public int compareToReference(T2 candidate) {
 		for (int i = 0; i < this.comparators1.length; i++) {
-			this.comparators2[i].setReference(candidate.getFieldNotNull(keyFields2[i]));
-			int res = this.comparators1[i].compareToReference(this.comparators2[i]);
-			if(res != 0) {
-				return res;
+			try {
+				this.comparators2[i].setReference(candidate
+						.getFieldNotNull(keyFields2[i]));
+				int res = this.comparators1[i]
+						.compareToReference(this.comparators2[i]);
+				if (res != 0) {
+					return res;
+				}
+			} catch (NullFieldException nfex) {
+				throw new NullKeyFieldException(nfex);
 			}
 		}
 		return 0;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index 163d8b2..1c777fc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.NullFieldException;
 
 
 public final class TupleSerializer<T extends Tuple> extends TypeSerializer<T> {
@@ -105,7 +106,11 @@ public final class TupleSerializer<T extends Tuple> extends TypeSerializer<T> {
 	public void serialize(T value, DataOutputView target) throws IOException {
 		for (int i = 0; i < arity; i++) {
 			Object o = value.getField(i);
-			fieldSerializers[i].serialize(o, target);
+			try {
+				fieldSerializers[i].serialize(o, target);
+			} catch (NullPointerException npex) {
+				throw new NullFieldException(i);
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
index caf98fd..5a8413e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
@@ -19,9 +19,8 @@
 
 package org.apache.flink.api.java.tuple;
 
+import org.apache.flink.types.NullFieldException;
 import org.junit.Assert;
-
-import org.apache.flink.types.NullKeyFieldException;
 import org.junit.Test;
 
 public class Tuple2Test {
@@ -36,17 +35,11 @@ public class Tuple2Test {
 		Assert.assertEquals(swapped.f1, toSwap.f0);
 	}
 	
-	@Test
+	@Test(expected = NullFieldException.class)
 	public void testGetFieldNotNull() {
 		Tuple2<String, Integer> tuple = new Tuple2<String, Integer>(new String("Test case"), null);
 
 		Assert.assertEquals("Test case", tuple.getFieldNotNull(0));
-
-		try {
-			tuple.getFieldNotNull(1);
-			Assert.fail();
-		} catch (NullKeyFieldException e) {
-			// right
-		}
+		tuple.getFieldNotNull(1);
 	}
 }