You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/21 09:52:38 UTC

[05/50] [abbrv] flink git commit: [FLINK-4609] [java-api] Remove redundant check for null in CrossOperator

[FLINK-4609] [java-api] Remove redundant check for null in CrossOperator

This closes #2490


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/470b752e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/470b752e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/470b752e

Branch: refs/heads/flip-6
Commit: 470b752e693ae4292d34fc7fc0c778f95fa04fd9
Parents: 7a25bf5
Author: Alexander Pivovarov <ap...@gmail.com>
Authored: Fri Sep 9 22:24:43 2016 -0700
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Sep 20 10:10:20 2016 -0400

----------------------------------------------------------------------
 .../flink/api/java/operators/CrossOperator.java | 23 ++++++--------------
 .../api/java/operators/TwoInputOperator.java    |  5 +++--
 2 files changed, 10 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/470b752e/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index 36e6c1c..3fdc51d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -20,9 +20,9 @@ package org.apache.flink.api.java.operators;
 
 import java.util.Arrays;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
@@ -124,21 +124,12 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 	@Public
 	public static final class DefaultCross<I1, I2> extends CrossOperator<I1, I2, Tuple2<I1, I2>>  {
 
-		private final DataSet<I1> input1;
-		private final DataSet<I2> input2;
-
 		public DefaultCross(DataSet<I1> input1, DataSet<I2> input2, CrossHint hint, String defaultName) {
-			
 			super(input1, input2, new DefaultCrossFunction<I1, I2>(),
-					new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()),
-					hint, defaultName);
-
-			if (input1 == null || input2 == null) {
-				throw new NullPointerException();
-			}
-
-			this.input1 = input1;
-			this.input2 = input2;
+				new TupleTypeInfo<Tuple2<I1, I2>>(
+					Preconditions.checkNotNull(input1, "input1 is null").getType(),
+					Preconditions.checkNotNull(input2, "input2 is null").getType()),
+				hint, defaultName);
 		}
 
 		/**
@@ -155,9 +146,9 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 			if (function == null) {
 				throw new NullPointerException("Cross function must not be null.");
 			}
-			TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType(),
+			TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, getInput1().getType(), getInput2().getType(),
 					super.getDefaultName(), true);
-			return new CrossOperator<I1, I2, R>(input1, input2, clean(function), returnType, 
+			return new CrossOperator<I1, I2, R>(getInput1(), getInput2(), clean(function), returnType,
 					getCrossHint(), Utils.getCallLocationName());
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/470b752e/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java
index c64882d..28dec32 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.operators;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Base class for operations that operates on two input data sets.
@@ -37,8 +38,8 @@ public abstract class TwoInputOperator<IN1, IN2, OUT, O extends TwoInputOperator
 	
 	
 	protected TwoInputOperator(DataSet<IN1> input1, DataSet<IN2> input2, TypeInformation<OUT> resultType) {
-		super(input1.getExecutionEnvironment(), resultType);
-		
+		super(Preconditions.checkNotNull(input1, "input1 is null").getExecutionEnvironment(), resultType);
+		Preconditions.checkNotNull(input2, "input2 is null");
 		DataSet.checkSameExecutionContext(input1, input2);
 		this.input1 = input1;
 		this.input2 = input2;