You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/09/06 14:16:39 UTC

flink git commit: [FLINK-4265] [dataset api] Add a NoOpOperator

Repository: flink
Updated Branches:
  refs/heads/master cab76f6e2 -> 66d4b8724


[FLINK-4265] [dataset api] Add a NoOpOperator

Adds a NoOpOperator which is unwound in OperatorTranslation.translate.
This will be first used by Gelly as a placeholder to support implicit
operator reuse.

This closes #2294


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/66d4b872
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/66d4b872
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/66d4b872

Branch: refs/heads/master
Commit: 66d4b8724b9e9b09225d2bbd3132dc2efdcf843a
Parents: cab76f6
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Jul 25 14:05:56 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Sep 6 09:25:48 2016 -0400

----------------------------------------------------------------------
 .../flink/api/java/operators/NoOpOperator.java  | 51 ++++++++++++++++++++
 .../api/java/operators/OperatorTranslation.java | 20 ++++----
 2 files changed, 62 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/66d4b872/flink-java/src/main/java/org/apache/flink/api/java/operators/NoOpOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/NoOpOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/NoOpOperator.java
new file mode 100644
index 0000000..369ab9e
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/NoOpOperator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This operator will be ignored during translation.
+ *
+ * @param <IN> The type of the data set passed through the operator.
+ */
+@Internal
+public class NoOpOperator<IN> extends DataSet<IN> {
+
+	private DataSet<IN> input;
+
+	public NoOpOperator(DataSet<IN> input, TypeInformation<IN> resultType) {
+		super(input.getExecutionEnvironment(), resultType);
+
+		this.input = input;
+	}
+
+	public DataSet<IN> getInput() {
+		return input;
+	}
+
+	public void setInput(DataSet<IN> input) {
+		Preconditions.checkNotNull(input);
+
+		this.input = input;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66d4b872/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
index 74811a3..3f44d58 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
@@ -19,15 +19,8 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.AbstractUdfOperator;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
@@ -35,8 +28,14 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.BulkIterationBase;
 import org.apache.flink.api.common.operators.base.DeltaIterationBase;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.configuration.Configuration;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 @Internal
 public class OperatorTranslation {
 	
@@ -70,7 +69,10 @@ public class OperatorTranslation {
 	
 	
 	private <T> Operator<T> translate(DataSet<T> dataSet) {
-		
+		while (dataSet instanceof NoOpOperator) {
+			dataSet = ((NoOpOperator<T>) dataSet).getInput();
+		}
+
 		// check if we have already translated that data set (operation or source)
 		Operator<?> previous = (Operator<?>) this.translated.get(dataSet);
 		if (previous != null) {