You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/09/07 01:01:07 UTC
[1/2] git commit: [FLINK-629] getFieldNotNull added to Tuple and
updated Aggregators and Comparators to use that where appropriate
Repository: incubator-flink
Updated Branches:
refs/heads/master 679bdc17c -> 66c1263de
[FLINK-629] getFieldNotNull added to Tuple and updated Aggregators and Comparators to use that where appropriate
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/761952c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/761952c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/761952c0
Branch: refs/heads/master
Commit: 761952c04084e3819d763bbcfb564ec69933dd92
Parents: 679bdc1
Author: gyfora <gy...@gmail.com>
Authored: Wed Jul 16 19:09:04 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Sep 6 23:08:06 2014 +0200
----------------------------------------------------------------------
.../apache/flink/types/NullFieldException.java | 74 ++++++++++++++++++++
.../api/java/operators/AggregateOperator.java | 11 ++-
.../translation/TupleKeyExtractingMapper.java | 14 ++--
.../org/apache/flink/api/java/tuple/Tuple.java | 19 +++++
.../java/typeutils/runtime/TupleComparator.java | 24 ++-----
.../runtime/TupleLeadingFieldComparator.java | 10 +--
.../TupleLeadingFieldPairComparator.java | 6 +-
.../typeutils/runtime/TuplePairComparator.java | 6 +-
.../apache/flink/api/java/tuple/Tuple2Test.java | 16 ++++-
9 files changed, 144 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
new file mode 100755
index 0000000..085660d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types;
+
+
+/**
+ * An exception specifying that a required field was not set in a record, i.e. was <code>null</code>.
+ */
+public class NullFieldException extends RuntimeException
+{
+ /**
+ * UID for serialization interoperability.
+ */
+ private static final long serialVersionUID = -8820467525772321173L;
+
+ private final int fieldNumber;
+
+ /**
+ * Constructs an {@code NullFieldException} with {@code null}
+ * as its error detail message.
+ */
+ public NullFieldException() {
+ super();
+ this.fieldNumber = -1;
+ }
+
+ /**
+ * Constructs an {@code NullFieldException} with the specified detail message.
+ *
+ * @param message The detail message.
+ */
+ public NullFieldException(String message) {
+ super(message);
+ this.fieldNumber = -1;
+ }
+
+ /**
+ * Constructs an {@code NullFieldException} with a default message, referring to
+ * given field number as the null field.
+ *
+ * @param fieldNumber The index of the field that was null, bit expected to hold a value.
+ */
+ public NullFieldException(int fieldNumber) {
+ super("Field " + fieldNumber + " is null, but expected to hold a value.");
+ this.fieldNumber = fieldNumber;
+ }
+
+ /**
+ * Gets the field number that was attempted to access. If the number is not set, this method returns
+ * {@code -1}.
+ *
+ * @return The field number that was attempted to access, or {@code -1}, if not set.
+ */
+ public int getFieldNumber() {
+ return this.fieldNumber;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
index 1ceb8c8..6073a1a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
@@ -37,8 +37,9 @@ import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.NullFieldException;
+import org.apache.flink.types.NullKeyFieldException;
import org.apache.flink.util.Collector;
-
import org.apache.flink.api.java.DataSet;
/**
@@ -283,8 +284,12 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
current = values.next();
for (int i = 0; i < fieldPositions.length; i++) {
- Object val = current.getField(fieldPositions[i]);
- aggFunctions[i].aggregate(val);
+ try {
+ Object val = current.getFieldNotNull(fieldPositions[i]);
+ aggFunctions[i].aggregate(val);
+ } catch (NullKeyFieldException e) {
+ throw new NullFieldException(fieldPositions[i]);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
index ecac775..97e67ca 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
@@ -21,6 +21,8 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.NullFieldException;
+import org.apache.flink.types.NullKeyFieldException;
public final class TupleKeyExtractingMapper<T, K> extends RichMapFunction<T, Tuple2<K, T>> {
@@ -42,10 +44,14 @@ public final class TupleKeyExtractingMapper<T, K> extends RichMapFunction<T, Tup
Tuple v = (Tuple) value;
- K key = v.getField(pos);
- tuple.f0 = key;
- tuple.f1 = value;
-
+ try {
+ K key = v.getFieldNotNull(pos);
+ tuple.f0 = key;
+ tuple.f1 = value;
+ } catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
+ }
+
return tuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
index 4738b02..2966830 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
@@ -18,6 +18,8 @@
package org.apache.flink.api.java.tuple;
+import org.apache.flink.types.NullKeyFieldException;
+
/**
* The base class of all tuples. Tuples have a fix length and contain a set of fields,
* which may all be of different types. Because Tuples are strongly typed, each distinct
@@ -47,6 +49,23 @@ public abstract class Tuple implements java.io.Serializable {
public abstract <T> T getField(int pos);
/**
+ * Gets the field at the specified position, throws NullKeyFieldException if the field is null. Used for comparing key fields.
+ *
+ * @param pos The position of the field, zero indexed.
+ * @returnThe field at the specified position.
+ * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
+ * @throws NullKeyFieldException Thrown, if the field at pos is null.
+ */
+ public <T> T getFieldNotNull(int pos){
+ T field = getField(pos);
+ if (field != null) {
+ return field;
+ } else {
+ throw new NullKeyFieldException(pos);
+ }
+ }
+
+ /**
* Sets the field at the specified position.
*
* @param value The value to be assigned to the field at the specified position.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
index e7dd25a..4472949 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -156,17 +156,14 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
public int hash(T value) {
int i = 0;
try {
- int code = this.comparators[0].hash(value.getField(keyPositions[0]));
+ int code = this.comparators[0].hash(value.getFieldNotNull(keyPositions[0]));
for (i = 1; i < this.keyPositions.length; i++) {
code *= HASH_SALT[i & 0x1F]; // salt code with (i % HASH_SALT.length)-th salt component
- code += this.comparators[i].hash(value.getField(keyPositions[i]));
+ code += this.comparators[i].hash(value.getFieldNotNull(keyPositions[i]));
}
return code;
}
- catch (NullPointerException npex) {
- throw new NullKeyFieldException(keyPositions[i]);
- }
catch (IndexOutOfBoundsException iobex) {
throw new KeyFieldOutOfBoundsException(keyPositions[i]);
}
@@ -177,12 +174,9 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
int i = 0;
try {
for (; i < this.keyPositions.length; i++) {
- this.comparators[i].setReference(toCompare.getField(this.keyPositions[i]));
+ this.comparators[i].setReference(toCompare.getFieldNotNull(this.keyPositions[i]));
}
}
- catch (NullPointerException npex) {
- throw new NullKeyFieldException(keyPositions[i]);
- }
catch (IndexOutOfBoundsException iobex) {
throw new KeyFieldOutOfBoundsException(keyPositions[i]);
}
@@ -193,15 +187,12 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
int i = 0;
try {
for (; i < this.keyPositions.length; i++) {
- if (!this.comparators[i].equalToReference(candidate.getField(this.keyPositions[i]))) {
+ if (!this.comparators[i].equalToReference(candidate.getFieldNotNull(this.keyPositions[i]))) {
return false;
}
}
return true;
}
- catch (NullPointerException npex) {
- throw new NullKeyFieldException(keyPositions[i]);
- }
catch (IndexOutOfBoundsException iobex) {
throw new KeyFieldOutOfBoundsException(keyPositions[i]);
}
@@ -236,15 +227,14 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
for (; i < keyPositions.length; i++) {
int keyPos = keyPositions[i];
@SuppressWarnings("unchecked")
- int cmp = comparators[i].compare((T)first.getField(keyPos), (T)second.getField(keyPos));
+ int cmp = comparators[i].compare((T)first.getFieldNotNull(keyPos), (T)second.getFieldNotNull(keyPos));
+
if (cmp != 0) {
return cmp;
}
}
return 0;
- } catch (NullPointerException npex) {
- throw new NullKeyFieldException(keyPositions[i]);
} catch (IndexOutOfBoundsException iobex) {
throw new KeyFieldOutOfBoundsException(keyPositions[i]);
}
@@ -304,7 +294,7 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
{
int len = this.normalizedKeyLengths[i];
len = numBytes >= len ? len : numBytes;
- this.comparators[i].putNormalizedKey(value.getField(this.keyPositions[i]), target, offset, len);
+ this.comparators[i].putNormalizedKey(value.getFieldNotNull(this.keyPositions[i]), target, offset, len);
numBytes -= len;
offset += len;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
index 43906f8..7a9aa43 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
@@ -46,18 +46,18 @@ public final class TupleLeadingFieldComparator<T extends Tuple, K> extends TypeC
@Override
public int hash(T value) {
- return comparator.hash(value.<K>getField(0));
+ return comparator.hash(value.<K>getFieldNotNull(0));
}
@Override
public void setReference(T toCompare) {
- this.comparator.setReference(toCompare.<K>getField(0));
+ this.comparator.setReference(toCompare.<K>getFieldNotNull(0));
}
@Override
public boolean equalToReference(T candidate) {
- return this.comparator.equalToReference(candidate.<K>getField(0));
+ return this.comparator.equalToReference(candidate.<K>getFieldNotNull(0));
}
@SuppressWarnings("unchecked")
@@ -68,7 +68,7 @@ public final class TupleLeadingFieldComparator<T extends Tuple, K> extends TypeC
@Override
public int compare(T first, T second) {
- return this.comparator.compare(first.<K>getField(0), second.<K>getField(0));
+ return this.comparator.compare(first.<K>getFieldNotNull(0), second.<K>getFieldNotNull(0));
}
@Override
@@ -98,7 +98,7 @@ public final class TupleLeadingFieldComparator<T extends Tuple, K> extends TypeC
@Override
public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
- this.comparator.putNormalizedKey(record.<K>getField(0), target, offset, numBytes);
+ this.comparator.putNormalizedKey(record.<K>getFieldNotNull(0), target, offset, numBytes);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
index cb7eef5..749c38d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
@@ -39,17 +39,17 @@ public class TupleLeadingFieldPairComparator<K, T1 extends Tuple, T2 extends Tup
@Override
public void setReference(T1 reference) {
- this.comparator1.setReference(reference.<K>getField(0));
+ this.comparator1.setReference(reference.<K>getFieldNotNull(0));
}
@Override
public boolean equalToReference(T2 candidate) {
- return this.comparator1.equalToReference(candidate.<K>getField(0));
+ return this.comparator1.equalToReference(candidate.<K>getFieldNotNull(0));
}
@Override
public int compareToReference(T2 candidate) {
- this.comparator2.setReference(candidate.<K>getField(0));
+ this.comparator2.setReference(candidate.<K>getFieldNotNull(0));
return this.comparator1.compareToReference(this.comparator2);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
index 796799b..43f46e4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
@@ -59,14 +59,14 @@ public class TuplePairComparator<T1 extends Tuple, T2 extends Tuple> extends Typ
@Override
public void setReference(T1 reference) {
for (int i = 0; i < this.comparators1.length; i++) {
- this.comparators1[i].setReference(reference.getField(keyFields1[i]));
+ this.comparators1[i].setReference(reference.getFieldNotNull(keyFields1[i]));
}
}
@Override
public boolean equalToReference(T2 candidate) {
for (int i = 0; i < this.comparators1.length; i++) {
- if (!this.comparators1[i].equalToReference(candidate.getField(keyFields2[i]))) {
+ if (!this.comparators1[i].equalToReference(candidate.getFieldNotNull(keyFields2[i]))) {
return false;
}
}
@@ -76,7 +76,7 @@ public class TuplePairComparator<T1 extends Tuple, T2 extends Tuple> extends Typ
@Override
public int compareToReference(T2 candidate) {
for (int i = 0; i < this.comparators1.length; i++) {
- this.comparators2[i].setReference(candidate.getField(keyFields2[i]));
+ this.comparators2[i].setReference(candidate.getFieldNotNull(keyFields2[i]));
int res = this.comparators1[i].compareToReference(this.comparators2[i]);
if(res != 0) {
return res;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/761952c0/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
index 7379a88..caf98fd 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
@@ -21,7 +21,7 @@ package org.apache.flink.api.java.tuple;
import org.junit.Assert;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.NullKeyFieldException;
import org.junit.Test;
public class Tuple2Test {
@@ -35,4 +35,18 @@ public class Tuple2Test {
Assert.assertEquals(swapped.f1, toSwap.f0);
}
+
+ @Test
+ public void testGetFieldNotNull() {
+ Tuple2<String, Integer> tuple = new Tuple2<String, Integer>(new String("Test case"), null);
+
+ Assert.assertEquals("Test case", tuple.getFieldNotNull(0));
+
+ try {
+ tuple.getFieldNotNull(1);
+ Assert.fail();
+ } catch (NullKeyFieldException e) {
+ // right
+ }
+ }
}
[2/2] git commit: [FLINK-629] Updated getFieldNotNull usage and added
it to TupleSerializers
Posted by fh...@apache.org.
[FLINK-629] Updated getFieldNotNull usage and added it to TupleSerializers
This closes #73
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/66c1263d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/66c1263d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/66c1263d
Branch: refs/heads/master
Commit: 66c1263dee5fdaa6436cc2d590f34d63747d6ca1
Parents: 761952c
Author: mbalassi <ba...@gmail.com>
Authored: Thu Jul 17 09:49:53 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sun Sep 7 00:59:57 2014 +0200
----------------------------------------------------------------------
.../apache/flink/types/NullFieldException.java | 20 +++++------
.../flink/types/NullKeyFieldException.java | 11 ++++++
.../api/java/functions/SelectByMaxFunction.java | 4 +--
.../api/java/functions/SelectByMinFunction.java | 4 +--
.../api/java/operators/AggregateOperator.java | 8 +----
.../org/apache/flink/api/java/tuple/Tuple.java | 9 ++---
.../java/typeutils/runtime/TupleComparator.java | 20 +++++++++--
.../runtime/TupleLeadingFieldComparator.java | 38 ++++++++++++++++----
.../TupleLeadingFieldPairComparator.java | 23 +++++++++---
.../typeutils/runtime/TuplePairComparator.java | 32 +++++++++++++----
.../java/typeutils/runtime/TupleSerializer.java | 7 +++-
.../apache/flink/api/java/tuple/Tuple2Test.java | 13 ++-----
12 files changed, 133 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
index 085660d..6948579 100755
--- a/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
+++ b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java
@@ -16,10 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.types;
-
/**
* An exception specifying that a required field was not set in a record, i.e. was <code>null</code>.
*/
@@ -30,7 +28,7 @@ public class NullFieldException extends RuntimeException
*/
private static final long serialVersionUID = -8820467525772321173L;
- private final int fieldNumber;
+ private final int fieldPos;
/**
* Constructs an {@code NullFieldException} with {@code null}
@@ -38,7 +36,7 @@ public class NullFieldException extends RuntimeException
*/
public NullFieldException() {
super();
- this.fieldNumber = -1;
+ this.fieldPos = -1;
}
/**
@@ -48,18 +46,18 @@ public class NullFieldException extends RuntimeException
*/
public NullFieldException(String message) {
super(message);
- this.fieldNumber = -1;
+ this.fieldPos = -1;
}
/**
* Constructs an {@code NullFieldException} with a default message, referring to
* given field number as the null field.
*
- * @param fieldNumber The index of the field that was null, bit expected to hold a value.
+ * @param fieldIdx The index of the field that was null, but expected to hold a value.
*/
- public NullFieldException(int fieldNumber) {
- super("Field " + fieldNumber + " is null, but expected to hold a value.");
- this.fieldNumber = fieldNumber;
+ public NullFieldException(int fieldIdx) {
+ super("Field " + fieldIdx + " is null, but expected to hold a value.");
+ this.fieldPos = fieldIdx;
}
/**
@@ -68,7 +66,7 @@ public class NullFieldException extends RuntimeException
*
* @return The field number that was attempted to access, or {@code -1}, if not set.
*/
- public int getFieldNumber() {
- return this.fieldNumber;
+ public int getFieldPos() {
+ return this.fieldPos;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-core/src/main/java/org/apache/flink/types/NullKeyFieldException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/NullKeyFieldException.java b/flink-core/src/main/java/org/apache/flink/types/NullKeyFieldException.java
index 8f6d5c6..1eddfd9 100644
--- a/flink-core/src/main/java/org/apache/flink/types/NullKeyFieldException.java
+++ b/flink-core/src/main/java/org/apache/flink/types/NullKeyFieldException.java
@@ -42,6 +42,17 @@ public class NullKeyFieldException extends RuntimeException
}
/**
+ * Constructs an {@code NullKeyFieldException} with a default message, referring to
+ * the field number given in the {@code NullFieldException}.
+ *
+ * @param nfex The base exception.
+ */
+ public NullKeyFieldException(NullFieldException nfex) {
+ super();
+ this.fieldNumber = nfex.getFieldPos();
+ }
+
+ /**
* Constructs an {@code NullKeyFieldException} with the specified detail message.
*
* @param message The detail message.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
index 614676e..aac59ae 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
@@ -73,8 +73,8 @@ public class SelectByMaxFunction<T extends Tuple> implements ReduceFunction<T> {
int position = this.fields[index];
// Get both values - both implement comparable
- Comparable comparable1 = value1.getField(position);
- Comparable comparable2 = value2.getField(position);
+ Comparable comparable1 = value1.getFieldNotNull(position);
+ Comparable comparable2 = value2.getFieldNotNull(position);
// Compare values
int comp = comparable1.compareTo(comparable2);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
index 4b0a7bf..011f8f6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
@@ -73,8 +73,8 @@ public class SelectByMinFunction<T extends Tuple> implements ReduceFunction<T> {
int position = this.fields[index];
// Get both values - both implement comparable
- Comparable comparable1 = value1.getField(position);
- Comparable comparable2 = value2.getField(position);
+ Comparable comparable1 = value1.getFieldNotNull(position);
+ Comparable comparable2 = value2.getFieldNotNull(position);
// Compare values
int comp = comparable1.compareTo(comparable2);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
index 6073a1a..d1fed5e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.aggregation.AggregationFunction;
import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
import org.apache.flink.api.java.aggregation.Aggregations;
@@ -37,10 +38,7 @@ import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.NullFieldException;
-import org.apache.flink.types.NullKeyFieldException;
import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
/**
* This operator represents the application of a "aggregate" operation on a data set, and the
@@ -284,12 +282,8 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
current = values.next();
for (int i = 0; i < fieldPositions.length; i++) {
- try {
Object val = current.getFieldNotNull(fieldPositions[i]);
aggFunctions[i].aggregate(val);
- } catch (NullKeyFieldException e) {
- throw new NullFieldException(fieldPositions[i]);
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
index 2966830..bc913f1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
@@ -18,7 +18,8 @@
package org.apache.flink.api.java.tuple;
-import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.types.NullFieldException;
+
/**
* The base class of all tuples. Tuples have a fix length and contain a set of fields,
@@ -49,19 +50,19 @@ public abstract class Tuple implements java.io.Serializable {
public abstract <T> T getField(int pos);
/**
- * Gets the field at the specified position, throws NullKeyFieldException if the field is null. Used for comparing key fields.
+ * Gets the field at the specified position, throws NullFieldException if the field is null. Used for comparing key fields.
*
* @param pos The position of the field, zero indexed.
* @returnThe field at the specified position.
* @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
- * @throws NullKeyFieldException Thrown, if the field at pos is null.
+ * @throws NullFieldException Thrown, if the field at pos is null.
*/
public <T> T getFieldNotNull(int pos){
T field = getField(pos);
if (field != null) {
return field;
} else {
- throw new NullKeyFieldException(pos);
+ throw new NullFieldException(pos);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
index 4472949..48cf08b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.NullFieldException;
import org.apache.flink.types.NullKeyFieldException;
@@ -164,6 +165,9 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
}
return code;
}
+ catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
+ }
catch (IndexOutOfBoundsException iobex) {
throw new KeyFieldOutOfBoundsException(keyPositions[i]);
}
@@ -177,6 +181,9 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
this.comparators[i].setReference(toCompare.getFieldNotNull(this.keyPositions[i]));
}
}
+ catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
+ }
catch (IndexOutOfBoundsException iobex) {
throw new KeyFieldOutOfBoundsException(keyPositions[i]);
}
@@ -193,6 +200,9 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
}
return true;
}
+ catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
+ }
catch (IndexOutOfBoundsException iobex) {
throw new KeyFieldOutOfBoundsException(keyPositions[i]);
}
@@ -233,9 +243,12 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
return cmp;
}
}
-
return 0;
- } catch (IndexOutOfBoundsException iobex) {
+ }
+ catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
+ }
+ catch (IndexOutOfBoundsException iobex) {
throw new KeyFieldOutOfBoundsException(keyPositions[i]);
}
}
@@ -299,6 +312,9 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
offset += len;
}
}
+ catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
+ }
catch (NullPointerException npex) {
throw new NullKeyFieldException(this.keyPositions[i]);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
index 7a9aa43..d63fccb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NullFieldException;
+import org.apache.flink.types.NullKeyFieldException;
public final class TupleLeadingFieldComparator<T extends Tuple, K> extends TypeComparator<T>
@@ -46,18 +48,32 @@ public final class TupleLeadingFieldComparator<T extends Tuple, K> extends TypeC
@Override
public int hash(T value) {
- return comparator.hash(value.<K>getFieldNotNull(0));
-
+ try {
+ return comparator.hash(value.<K> getFieldNotNull(0));
+ } catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
+ }
+
}
@Override
public void setReference(T toCompare) {
- this.comparator.setReference(toCompare.<K>getFieldNotNull(0));
+ try {
+ this.comparator.setReference(toCompare.<K> getFieldNotNull(0));
+ } catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
+ }
}
+
@Override
public boolean equalToReference(T candidate) {
- return this.comparator.equalToReference(candidate.<K>getFieldNotNull(0));
+ try {
+ return this.comparator.equalToReference(candidate
+ .<K> getFieldNotNull(0));
+ } catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
+ }
}
@SuppressWarnings("unchecked")
@@ -68,7 +84,12 @@ public final class TupleLeadingFieldComparator<T extends Tuple, K> extends TypeC
@Override
public int compare(T first, T second) {
- return this.comparator.compare(first.<K>getFieldNotNull(0), second.<K>getFieldNotNull(0));
+ try {
+ return this.comparator.compare(first.<K> getFieldNotNull(0),
+ second.<K> getFieldNotNull(0));
+ } catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
+ }
}
@Override
@@ -98,7 +119,12 @@ public final class TupleLeadingFieldComparator<T extends Tuple, K> extends TypeC
@Override
public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
- this.comparator.putNormalizedKey(record.<K>getFieldNotNull(0), target, offset, numBytes);
+ try {
+ this.comparator.putNormalizedKey(record.<K> getFieldNotNull(0),
+ target, offset, numBytes);
+ } catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
index 749c38d..3611f70 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
@@ -23,6 +23,8 @@ import java.io.Serializable;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.types.NullFieldException;
+import org.apache.flink.types.NullKeyFieldException;
public class TupleLeadingFieldPairComparator<K, T1 extends Tuple, T2 extends Tuple> extends TypePairComparator<T1, T2> implements Serializable {
@@ -39,17 +41,30 @@ public class TupleLeadingFieldPairComparator<K, T1 extends Tuple, T2 extends Tup
@Override
public void setReference(T1 reference) {
- this.comparator1.setReference(reference.<K>getFieldNotNull(0));
+ try {
+ this.comparator1.setReference(reference.<K> getFieldNotNull(0));
+ } catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
+ }
}
@Override
public boolean equalToReference(T2 candidate) {
- return this.comparator1.equalToReference(candidate.<K>getFieldNotNull(0));
+ try {
+ return this.comparator1.equalToReference(candidate
+ .<K> getFieldNotNull(0));
+ } catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
+ }
}
@Override
public int compareToReference(T2 candidate) {
- this.comparator2.setReference(candidate.<K>getFieldNotNull(0));
- return this.comparator1.compareToReference(this.comparator2);
+ try {
+ this.comparator2.setReference(candidate.<K> getFieldNotNull(0));
+ return this.comparator1.compareToReference(this.comparator2);
+ } catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
index 43f46e4..4c4094d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
@@ -23,6 +23,8 @@ import java.io.Serializable;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.types.NullFieldException;
+import org.apache.flink.types.NullKeyFieldException;
public class TuplePairComparator<T1 extends Tuple, T2 extends Tuple> extends TypePairComparator<T1, T2> implements Serializable {
@@ -59,15 +61,25 @@ public class TuplePairComparator<T1 extends Tuple, T2 extends Tuple> extends Typ
@Override
public void setReference(T1 reference) {
for (int i = 0; i < this.comparators1.length; i++) {
- this.comparators1[i].setReference(reference.getFieldNotNull(keyFields1[i]));
+ try {
+ this.comparators1[i].setReference(reference
+ .getFieldNotNull(keyFields1[i]));
+ } catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
+ }
}
}
@Override
public boolean equalToReference(T2 candidate) {
for (int i = 0; i < this.comparators1.length; i++) {
- if (!this.comparators1[i].equalToReference(candidate.getFieldNotNull(keyFields2[i]))) {
- return false;
+ try {
+ if (!this.comparators1[i].equalToReference(candidate
+ .getFieldNotNull(keyFields2[i]))) {
+ return false;
+ }
+ } catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
}
}
return true;
@@ -76,10 +88,16 @@ public class TuplePairComparator<T1 extends Tuple, T2 extends Tuple> extends Typ
@Override
public int compareToReference(T2 candidate) {
for (int i = 0; i < this.comparators1.length; i++) {
- this.comparators2[i].setReference(candidate.getFieldNotNull(keyFields2[i]));
- int res = this.comparators1[i].compareToReference(this.comparators2[i]);
- if(res != 0) {
- return res;
+ try {
+ this.comparators2[i].setReference(candidate
+ .getFieldNotNull(keyFields2[i]));
+ int res = this.comparators1[i]
+ .compareToReference(this.comparators2[i]);
+ if (res != 0) {
+ return res;
+ }
+ } catch (NullFieldException nfex) {
+ throw new NullKeyFieldException(nfex);
}
}
return 0;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index 163d8b2..1c777fc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.NullFieldException;
public final class TupleSerializer<T extends Tuple> extends TypeSerializer<T> {
@@ -105,7 +106,11 @@ public final class TupleSerializer<T extends Tuple> extends TypeSerializer<T> {
public void serialize(T value, DataOutputView target) throws IOException {
for (int i = 0; i < arity; i++) {
Object o = value.getField(i);
- fieldSerializers[i].serialize(o, target);
+ try {
+ fieldSerializers[i].serialize(o, target);
+ } catch (NullPointerException npex) {
+ throw new NullFieldException(i);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/66c1263d/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
index caf98fd..5a8413e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
@@ -19,9 +19,8 @@
package org.apache.flink.api.java.tuple;
+import org.apache.flink.types.NullFieldException;
import org.junit.Assert;
-
-import org.apache.flink.types.NullKeyFieldException;
import org.junit.Test;
public class Tuple2Test {
@@ -36,17 +35,11 @@ public class Tuple2Test {
Assert.assertEquals(swapped.f1, toSwap.f0);
}
- @Test
+ @Test(expected = NullFieldException.class)
public void testGetFieldNotNull() {
Tuple2<String, Integer> tuple = new Tuple2<String, Integer>(new String("Test case"), null);
Assert.assertEquals("Test case", tuple.getFieldNotNull(0));
-
- try {
- tuple.getFieldNotNull(1);
- Assert.fail();
- } catch (NullKeyFieldException e) {
- // right
- }
+ tuple.getFieldNotNull(1);
}
}