You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/06/08 07:33:52 UTC
[3/3] flink git commit: [FLINK-1319] [core] Add static code analysis
for user code
[FLINK-1319] [core] Add static code analysis for user code
This closes #729.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c854d526
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c854d526
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c854d526
Branch: refs/heads/master
Commit: c854d5260c20b0926c4347c7c9dd7d0f4f11d620
Parents: d433ba9
Author: twalthr <tw...@apache.org>
Authored: Tue May 26 20:22:03 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jun 8 07:29:49 2015 +0200
----------------------------------------------------------------------
.../flink/api/common/CodeAnalysisMode.java | 52 +
.../flink/api/common/ExecutionConfig.java | 26 +
flink-java/pom.xml | 4 +-
.../flink/api/java/ExecutionEnvironment.java | 3 +
.../java/org/apache/flink/api/java/Utils.java | 4 +
.../api/java/functions/FunctionAnnotation.java | 17 +-
.../api/java/functions/SemanticPropUtil.java | 133 +-
.../api/java/operators/CoGroupOperator.java | 6 +-
.../flink/api/java/operators/CrossOperator.java | 2 +
.../api/java/operators/FilterOperator.java | 2 +
.../api/java/operators/FlatMapOperator.java | 4 +-
.../java/operators/GroupCombineOperator.java | 4 +-
.../api/java/operators/GroupReduceOperator.java | 6 +-
.../flink/api/java/operators/JoinOperator.java | 8 +-
.../flink/api/java/operators/MapOperator.java | 2 +
.../api/java/operators/ReduceOperator.java | 6 +-
.../java/operators/SingleInputUdfOperator.java | 31 +-
.../api/java/operators/TwoInputUdfOperator.java | 43 +-
.../flink/api/java/operators/UdfOperator.java | 1 -
.../api/java/operators/UdfOperatorUtils.java | 103 ++
.../api/java/sca/CodeAnalyzerException.java | 42 +
.../flink/api/java/sca/CodeErrorException.java | 42 +
.../flink/api/java/sca/ModifiedASMAnalyzer.java | 169 +++
.../flink/api/java/sca/ModifiedASMFrame.java | 84 ++
.../api/java/sca/NestedMethodAnalyzer.java | 730 ++++++++++
.../apache/flink/api/java/sca/TaggedValue.java | 421 ++++++
.../apache/flink/api/java/sca/UdfAnalyzer.java | 474 ++++++
.../flink/api/java/sca/UdfAnalyzerUtils.java | 329 +++++
.../SemanticPropertiesPrecedenceTest.java | 183 +++
.../api/java/sca/UdfAnalyzerExamplesTest.java | 707 +++++++++
.../flink/api/java/sca/UdfAnalyzerTest.java | 1353 ++++++++++++++++++
.../apache/flink/test/util/TestEnvironment.java | 4 +-
pom.xml | 4 +-
33 files changed, 4916 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-core/src/main/java/org/apache/flink/api/common/CodeAnalysisMode.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/CodeAnalysisMode.java b/flink-core/src/main/java/org/apache/flink/api/common/CodeAnalysisMode.java
new file mode 100644
index 0000000..e9d8541
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/CodeAnalysisMode.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+/**
+ * Specifies to which extent user-defined functions are analyzed in order
+ * to give the Flink optimizer an insight of UDF internals and inform
+ * the user about common implementation mistakes.
+ *
+ * The analyzer gives hints about:
+ * - ForwardedFields semantic properties
+ * - Warnings if static fields are modified by a Function
+ * - Warnings if a FilterFunction modifies its input objects
+ * - Warnings if a Function returns null
+ * - Warnings if a tuple access uses a wrong index
+ * - Information about the number of object creations (for manual optimization)
+ */
+public enum CodeAnalysisMode {
+
+ /**
+ * Code analysis does not take place.
+ */
+ DISABLE,
+
+ /**
+ * Hints for improvement of the program are printed to the log.
+ */
+ HINT,
+
+ /**
+ * The program will be automatically optimized with knowledge from code
+ * analysis.
+ */
+ OPTIMIZE;
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 04a518e..4974295 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -44,6 +44,10 @@ import java.util.Map;
* handling <i>generic types</i> and <i>POJOs</i>. This is usually only needed
* when the functions return not only the types declared in their signature, but
* also subclasses of those types.</li>
+ * <li>The {@link CodeAnalysisMode} of the program: Enable hinting/optimizing or disable
+ * the "static code analyzer". The static code analyzer pre-interprets user-defined functions in order to
+ * get implementation insights for program improvements that can be printed to the log or
+ * automatically applied.</li>
* </ul>
*/
public class ExecutionConfig implements Serializable {
@@ -78,6 +82,8 @@ public class ExecutionConfig implements Serializable {
private boolean forceAvro = false;
+ private CodeAnalysisMode codeAnalysisMode = CodeAnalysisMode.DISABLE;
+
/** If set to true, progress updates are printed to System.out during execution */
private boolean printProgressDuringExecution = true;
@@ -316,6 +322,26 @@ public class ExecutionConfig implements Serializable {
public boolean isObjectReuseEnabled() {
return objectReuse;
}
+
+ /**
+ * Sets the {@link CodeAnalysisMode} of the program. Specifies to which extent user-defined
+ * functions are analyzed in order to give the Flink optimizer an insight of UDF internals
+ * and inform the user about common implementation mistakes. The static code analyzer pre-interprets
+ * user-defined functions in order to get implementation insights for program improvements
+ * that can be printed to the log, automatically applied, or disabled.
+ *
+ * @param codeAnalysisMode see {@link CodeAnalysisMode}
+ */
+ public void setCodeAnalysisMode(CodeAnalysisMode codeAnalysisMode) {
+ this.codeAnalysisMode = codeAnalysisMode;
+ }
+
+ /**
+ * Returns the {@link CodeAnalysisMode} of the program.
+ */
+ public CodeAnalysisMode getCodeAnalysisMode() {
+ return codeAnalysisMode;
+ }
/**
* Enables the printing of progress update messages to {@code System.out}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 6196e82..8879803 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -60,10 +60,10 @@ under the License.
<dependency>
<groupId>org.ow2.asm</groupId>
- <artifactId>asm</artifactId>
+ <artifactId>asm-all</artifactId>
<version>${asm.version}</version>
</dependency>
-
+
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 3a5b04f..d50ddb4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -991,6 +991,9 @@ public abstract class ExecutionEnvironment {
LOG.debug("Registered Kryo default Serializers: {}", Joiner.on(',').join(config.getDefaultKryoSerializers()));
LOG.debug("Registered Kryo default Serializers Classes {}", Joiner.on(',').join(config.getDefaultKryoSerializerClasses()));
LOG.debug("Registered POJO types: {}", Joiner.on(',').join(config.getRegisteredPojoTypes()));
+
+ // print information about static code analysis
+ LOG.debug("Static code analysis mode: {}", config.getCodeAnalysisMode());
}
return plan;
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index 38b24a2..dd1d6d2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -29,8 +29,10 @@ import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.List;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
+
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
+import static org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAnalysis;
public class Utils {
@@ -70,6 +72,7 @@ public class Utils {
}
}
+ @SkipCodeAnalysis
public static class CountHelper<T> extends RichFlatMapFunction<T, Long> {
private static final long serialVersionUID = 1L;
@@ -93,6 +96,7 @@ public class Utils {
}
}
+ @SkipCodeAnalysis
public static class CollectHelper<T> extends RichFlatMapFunction<T, T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
index 09678fd..bfc1bf0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
@@ -369,7 +369,22 @@ public class FunctionAnnotation {
public @interface ReadFieldsSecond {
String[] value();
}
-
+
+ /**
+ * The SkipCodeAnalysis annotation declares that a function will not be analyzed by Flink's
+ * code analysis capabilities independent of the configured {@link org.apache.flink.api.common.CodeAnalysisMode}.
+ *
+ * If this annotation is not present the static code analyzer pre-interprets user-defined
+ * functions in order to get implementation insights for program improvements that can be
+ * printed to the log as hints, automatically applied, or disabled (see
+ * {@link org.apache.flink.api.common.ExecutionConfig}).
+ *
+ */
+ @Target(ElementType.TYPE)
+ @Retention(RetentionPolicy.RUNTIME)
+ public @interface SkipCodeAnalysis {
+ }
+
/**
* Private constructor to prevent instantiation. This class is intended only as a container.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
index 4569be3..7640e2c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
@@ -19,14 +19,6 @@
package org.apache.flink.api.java.functions;
-import java.lang.annotation.Annotation;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SemanticProperties.InvalidSemanticAnnotationException;
@@ -34,13 +26,13 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException;
import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
-import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFieldsSecond;
import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsFirst;
@@ -48,6 +40,14 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsSecond;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import java.lang.annotation.Annotation;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
public class SemanticPropUtil {
private final static String REGEX_WILDCARD = "[\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR+"\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA+"]";
@@ -235,7 +235,7 @@ public class SemanticPropUtil {
public static SingleInputSemanticProperties getSemanticPropsSingle(
Set<Annotation> set, TypeInformation<?> inType, TypeInformation<?> outType) {
if (set == null) {
- return new SingleInputSemanticProperties();
+ return null;
}
Iterator<Annotation> it = set.iterator();
@@ -264,15 +264,14 @@ public class SemanticPropUtil {
SingleInputSemanticProperties result = new SingleInputSemanticProperties();
getSemanticPropsSingleFromString(result, forwarded, nonForwarded, read, inType, outType);
return result;
- } else {
- return new SingleInputSemanticProperties();
}
+ return null;
}
public static DualInputSemanticProperties getSemanticPropsDual(
Set<Annotation> set, TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType) {
if (set == null) {
- return new DualInputSemanticProperties();
+ return null;
}
Iterator<Annotation> it = set.iterator();
@@ -309,15 +308,20 @@ public class SemanticPropUtil {
getSemanticPropsDualFromString(result, forwardedFirst, forwardedSecond,
nonForwardedFirst, nonForwardedSecond, readFirst, readSecond, inType1, inType2, outType);
return result;
- } else {
- return new DualInputSemanticProperties();
}
+ return null;
+ }
+
+ public static void getSemanticPropsSingleFromString(SingleInputSemanticProperties result,
+ String[] forwarded, String[] nonForwarded, String[] readSet,
+ TypeInformation<?> inType, TypeInformation<?> outType) {
+ getSemanticPropsSingleFromString(result, forwarded, nonForwarded, readSet, inType, outType, false);
}
public static void getSemanticPropsSingleFromString(SingleInputSemanticProperties result,
String[] forwarded, String[] nonForwarded, String[] readSet,
- TypeInformation<?> inType, TypeInformation<?> outType)
- {
+ TypeInformation<?> inType, TypeInformation<?> outType,
+ boolean skipIncompatibleTypes) {
boolean hasForwardedAnnotation = false;
boolean hasNonForwardedAnnotation = false;
@@ -334,9 +338,9 @@ public class SemanticPropUtil {
throw new InvalidSemanticAnnotationException("Either ForwardedFields OR " +
"NonForwardedFields annotation permitted, NOT both.");
} else if(hasForwardedAnnotation) {
- parseForwardedFields(result, forwarded, inType, outType, 0);
+ parseForwardedFields(result, forwarded, inType, outType, 0, skipIncompatibleTypes);
} else if(hasNonForwardedAnnotation) {
- parseNonForwardedFields(result, nonForwarded, inType, outType, 0);
+ parseNonForwardedFields(result, nonForwarded, inType, outType, 0, skipIncompatibleTypes);
}
parseReadFields(result, readSet, inType, 0);
}
@@ -345,8 +349,18 @@ public class SemanticPropUtil {
String[] forwardedFirst, String[] forwardedSecond,
String[] nonForwardedFirst, String[] nonForwardedSecond, String[]
readFieldsFirst, String[] readFieldsSecond,
- TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType)
- {
+ TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType) {
+ getSemanticPropsDualFromString(result, forwardedFirst, forwardedSecond, nonForwardedFirst,
+ nonForwardedSecond, readFieldsFirst, readFieldsSecond, inType1, inType2, outType,
+ false);
+ }
+
+ public static void getSemanticPropsDualFromString(DualInputSemanticProperties result,
+ String[] forwardedFirst, String[] forwardedSecond,
+ String[] nonForwardedFirst, String[] nonForwardedSecond, String[]
+ readFieldsFirst, String[] readFieldsSecond,
+ TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType,
+ boolean skipIncompatibleTypes) {
boolean hasForwardedFirstAnnotation = false;
boolean hasForwardedSecondAnnotation = false;
@@ -377,15 +391,15 @@ public class SemanticPropUtil {
}
if(hasForwardedFirstAnnotation) {
- parseForwardedFields(result, forwardedFirst, inType1, outType, 0);
+ parseForwardedFields(result, forwardedFirst, inType1, outType, 0, skipIncompatibleTypes);
} else if(hasNonForwardedFirstAnnotation) {
- parseNonForwardedFields(result, nonForwardedFirst, inType1, outType, 0);
+ parseNonForwardedFields(result, nonForwardedFirst, inType1, outType, 0, skipIncompatibleTypes);
}
if(hasForwardedSecondAnnotation) {
- parseForwardedFields(result, forwardedSecond, inType2, outType, 1);
+ parseForwardedFields(result, forwardedSecond, inType2, outType, 1, skipIncompatibleTypes);
} else if(hasNonForwardedSecondAnnotation) {
- parseNonForwardedFields(result, nonForwardedSecond, inType2, outType, 1);
+ parseNonForwardedFields(result, nonForwardedSecond, inType2, outType, 1, skipIncompatibleTypes);
}
parseReadFields(result, readFieldsFirst, inType1, 0);
@@ -393,7 +407,8 @@ public class SemanticPropUtil {
}
- private static void parseForwardedFields(SemanticProperties sp, String[] forwardedStr, TypeInformation<?> inType, TypeInformation<?> outType, int input) {
+ private static void parseForwardedFields(SemanticProperties sp, String[] forwardedStr,
+ TypeInformation<?> inType, TypeInformation<?> outType, int input, boolean skipIncompatibleTypes) {
if (forwardedStr == null) {
return;
@@ -412,8 +427,13 @@ public class SemanticPropUtil {
if (wcMatcher.matches()) {
if (!inType.equals(outType)) {
- throw new InvalidSemanticAnnotationException("Forwarded field annotation \"" + s +
- "\" with wildcard only allowed for identical input and output types.");
+ if (skipIncompatibleTypes) {
+ continue;
+ }
+ else {
+ throw new InvalidSemanticAnnotationException("Forwarded field annotation \"" + s +
+ "\" with wildcard only allowed for identical input and output types.");
+ }
}
for (int i = 0; i < inType.getTotalFields(); i++) {
@@ -440,8 +460,13 @@ public class SemanticPropUtil {
try {
// check type compatibility
- if (!areFieldsCompatible(sourceStr, inType, targetStr, outType)) {
- throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match.");
+ if (!areFieldsCompatible(sourceStr, inType, targetStr, outType, !skipIncompatibleTypes)) {
+ if (skipIncompatibleTypes) {
+ continue;
+ }
+ else {
+ throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match.");
+ }
}
List<FlatFieldDescriptor> inFFDs = getFlatFields(sourceStr, inType);
List<FlatFieldDescriptor> outFFDs = getFlatFields(targetStr, outType);
@@ -478,8 +503,13 @@ public class SemanticPropUtil {
String fieldStr = fieldMatcher.group();
try {
// check if field is compatible in input and output type
- if (!areFieldsCompatible(fieldStr, inType, fieldStr, outType)) {
- throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match.");
+ if (!areFieldsCompatible(fieldStr, inType, fieldStr, outType, !skipIncompatibleTypes)) {
+ if (skipIncompatibleTypes) {
+ continue;
+ }
+ else {
+ throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match.");
+ }
}
// add flat field positions
List<FlatFieldDescriptor> inFFDs = getFlatFields(fieldStr, inType);
@@ -503,8 +533,8 @@ public class SemanticPropUtil {
}
}
- private static void parseNonForwardedFields(
- SemanticProperties sp, String[] nonForwardedStr, TypeInformation<?> inType, TypeInformation<?> outType, int input) {
+ private static void parseNonForwardedFields(SemanticProperties sp, String[] nonForwardedStr,
+ TypeInformation<?> inType, TypeInformation<?> outType, int input, boolean skipIncompatibleTypes) {
if(nonForwardedStr == null) {
return;
@@ -521,7 +551,12 @@ public class SemanticPropUtil {
}
if(!inType.equals(outType)) {
- throw new InvalidSemanticAnnotationException("Non-forwarded fields annotation only allowed for identical input and output types.");
+ if (skipIncompatibleTypes) {
+ continue;
+ }
+ else {
+ throw new InvalidSemanticAnnotationException("Non-forwarded fields annotation only allowed for identical input and output types.");
+ }
}
Matcher matcher = PATTERN_LIST.matcher(s);
@@ -613,14 +648,24 @@ public class SemanticPropUtil {
////////////////////// UTIL METHODS ///////////////////////////////
- private static boolean areFieldsCompatible(String sourceField, TypeInformation<?> inType, String targetField, TypeInformation<?> outType) {
-
- // get source type information
- TypeInformation<?> sourceType = getExpressionTypeInformation(sourceField, inType);
- // get target type information
- TypeInformation<?> targetType = getExpressionTypeInformation(targetField, outType);
+ private static boolean areFieldsCompatible(String sourceField, TypeInformation<?> inType, String targetField,
+ TypeInformation<?> outType, boolean throwException) {
- return (sourceType.equals(targetType));
+ try {
+ // get source type information
+ TypeInformation<?> sourceType = getExpressionTypeInformation(sourceField, inType);
+ // get target type information
+ TypeInformation<?> targetType = getExpressionTypeInformation(targetField, outType);
+ return sourceType.equals(targetType);
+ }
+ catch (InvalidFieldReferenceException e) {
+ if (throwException) {
+ throw e;
+ }
+ else {
+ return false;
+ }
+ }
}
private static TypeInformation<?> getExpressionTypeInformation(String fieldStr, TypeInformation<?> typeInfo) {
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 115a238..36378b9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -124,6 +124,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
this.keys1 = keys1;
this.keys2 = keys2;
+
+ UdfOperatorUtils.analyzeDualInputUdf(this, CoGroupFunction.class, defaultName, function, keys1, keys2);
}
@Override
@@ -144,9 +146,9 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
int numFields1 = this.getInput1Type().getTotalFields();
int numFields2 = this.getInput2Type().getTotalFields();
int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ?
- ((Keys.SelectorFunctionKeys) this.keys1).getKeyType().getTotalFields() : 0;
+ ((Keys.SelectorFunctionKeys<?,?>) this.keys1).getKeyType().getTotalFields() : 0;
int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ?
- ((Keys.SelectorFunctionKeys) this.keys2).getKeyType().getTotalFields() : 0;
+ ((Keys.SelectorFunctionKeys<?,?>) this.keys2).getKeyType().getTotalFields() : 0;
props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index 5ed3e40..ae990ce 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -67,6 +67,8 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
this.function = function;
this.defaultName = defaultName;
this.hint = hint;
+
+ UdfOperatorUtils.analyzeDualInputUdf(this, CrossFunction.class, defaultName, function, null, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
index f55de1c..70bfa93 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
@@ -41,6 +41,8 @@ public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperat
this.function = function;
this.defaultName = defaultName;
+
+ UdfOperatorUtils.analyzeSingleInputUdf(this, FilterFunction.class, defaultName, function, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
index 8caacae..10bb286 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
@@ -43,13 +43,15 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl
this.function = function;
this.defaultName = defaultName;
+
+ UdfOperatorUtils.analyzeSingleInputUdf(this, FlatMapFunction.class, defaultName, function, null);
}
@Override
protected FlatMapFunction<IN, OUT> getFunction() {
return function;
}
-
+
@Override
protected FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
String name = getName() != null ? getName() : "FlatMap at "+defaultName;
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index dc26fec..30cb0be 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -98,9 +98,9 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
this.grouper != null &&
this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
- int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+ int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
if(this.grouper instanceof SortedGrouping) {
- offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+ offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
}
props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index bc4413f..fcbb888 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -87,6 +87,8 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
this.defaultName = defaultName;
checkCombinability();
+
+ UdfOperatorUtils.analyzeSingleInputUdf(this, GroupReduceFunction.class, defaultName, function, grouper.keys);
}
private void checkCombinability() {
@@ -132,9 +134,9 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
this.grouper != null &&
this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
- int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+ int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
if(this.grouper instanceof SortedGrouping) {
- offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+ offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
}
props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 4adf6b3..1e5baab 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -202,6 +202,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
this.function = function;
this.joinLocationName = joinLocationName;
+
+ UdfOperatorUtils.analyzeDualInputUdf(this, FlatJoinFunction.class, joinLocationName, function, keys1, keys2);
}
public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
@@ -217,6 +219,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
}
this.function = generatedFunction;
+
+ UdfOperatorUtils.analyzeDualInputUdf(this, JoinFunction.class, joinLocationName, function, keys1, keys2);
}
@Override
@@ -237,9 +241,9 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
int numFields1 = this.getInput1Type().getTotalFields();
int numFields2 = this.getInput2Type().getTotalFields();
int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ?
- ((Keys.SelectorFunctionKeys) this.keys1).getKeyType().getTotalFields() : 0;
+ ((Keys.SelectorFunctionKeys<?,?>) this.keys1).getKeyType().getTotalFields() : 0;
int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ?
- ((Keys.SelectorFunctionKeys) this.keys2).getKeyType().getTotalFields() : 0;
+ ((Keys.SelectorFunctionKeys<?,?>) this.keys2).getKeyType().getTotalFields() : 0;
props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index 2663a2a..eaaeb38 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -45,6 +45,8 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
this.defaultName = defaultName;
this.function = function;
+
+ UdfOperatorUtils.analyzeSingleInputUdf(this, MapFunction.class, defaultName, function, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index e770278..1193da5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -72,6 +72,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
this.function = function;
this.grouper = input;
this.defaultName = defaultName;
+
+ UdfOperatorUtils.analyzeSingleInputUdf(this, ReduceFunction.class, defaultName, function, grouper.keys);
}
@Override
@@ -89,9 +91,9 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
this.grouper != null &&
this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
- int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+ int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
if(this.grouper instanceof SortedGrouping) {
- offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+ offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
}
props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
index f55489f..9301e1a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
@@ -54,8 +54,11 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
private Map<String, DataSet<?>> broadcastVariables;
+ // NOTE: only set this variable via setSemanticProperties()
private SingleInputSemanticProperties udfSemantics;
+ private boolean analyzedUdfSemantics;
+
// --------------------------------------------------------------------------------------------
/**
@@ -157,11 +160,12 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
if(this.udfSemantics == null) {
// extract semantic properties from function annotations
- this.udfSemantics = extractSemanticAnnotations(getFunction().getClass());
+ setSemanticProperties(extractSemanticAnnotations(getFunction().getClass()));
}
- if(this.udfSemantics == null) {
- this.udfSemantics = new SingleInputSemanticProperties();
+ if(this.udfSemantics == null
+ || this.analyzedUdfSemantics) { // discard analyzed semantic properties
+ setSemanticProperties(new SingleInputSemanticProperties());
SemanticPropUtil.getSemanticPropsSingleFromString(this.udfSemantics, forwardedFields, null, null, this.getInputType(), this.getResultType());
} else {
if(udfWithForwardedFieldsAnnotation(getFunction().getClass())) {
@@ -311,11 +315,15 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
@Override
public SingleInputSemanticProperties getSemanticProperties() {
- if (this.udfSemantics == null) {
+ if (this.udfSemantics == null || analyzedUdfSemantics) {
SingleInputSemanticProperties props = extractSemanticAnnotations(getFunction().getClass());
- this.udfSemantics = props != null ? props : new SingleInputSemanticProperties();
+ if (props != null) {
+ setSemanticProperties(props);
+ }
+ }
+ if (this.udfSemantics == null) {
+ setSemanticProperties(new SingleInputSemanticProperties());
}
-
return this.udfSemantics;
}
@@ -329,8 +337,17 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
*/
public void setSemanticProperties(SingleInputSemanticProperties properties) {
this.udfSemantics = properties;
+ this.analyzedUdfSemantics = false;
}
-
+
+ protected boolean getAnalyzedUdfSemanticsFlag() {
+ return this.analyzedUdfSemantics;
+ }
+
+ protected void setAnalyzedUdfSemanticsFlag() {
+ this.analyzedUdfSemantics = true;
+ }
+
protected SingleInputSemanticProperties extractSemanticAnnotations(Class<?> udfClass) {
Set<Annotation> annotations = FunctionAnnotation.readSingleForwardAnnotations(udfClass);
return SemanticPropUtil.getSemanticPropsSingle(annotations, getInputType(), getResultType());
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
index 91f9f7e..d23dd56 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
@@ -29,12 +29,12 @@ import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.functions.SemanticPropUtil;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.DataSet;
/**
* The <tt>TwoInputUdfOperator</tt> is the base class of all binary operators that execute
@@ -56,8 +56,11 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
private Map<String, DataSet<?>> broadcastVariables;
+ // NOTE: only set this variable via setSemanticProperties()
private DualInputSemanticProperties udfSemantics;
+ private boolean analyzedUdfSemantics;
+
// --------------------------------------------------------------------------------------------
/**
@@ -157,13 +160,13 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
*/
@SuppressWarnings("unchecked")
public O withForwardedFieldsFirst(String... forwardedFieldsFirst) {
- if (this.udfSemantics == null) {
+ if (this.udfSemantics == null || this.analyzedUdfSemantics) {
// extract semantic properties from function annotations
- this.udfSemantics = extractSemanticAnnotationsFromUdf(getFunction().getClass());
+ setSemanticProperties(extractSemanticAnnotationsFromUdf(getFunction().getClass()));
}
- if(this.udfSemantics == null) {
- this.udfSemantics = new DualInputSemanticProperties();
+ if(this.udfSemantics == null || this.analyzedUdfSemantics) {
+ setSemanticProperties(new DualInputSemanticProperties());
SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, forwardedFieldsFirst, null,
null, null, null, null, getInput1Type(), getInput2Type(), getResultType());
} else {
@@ -232,13 +235,13 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
*/
@SuppressWarnings("unchecked")
public O withForwardedFieldsSecond(String... forwardedFieldsSecond) {
- if (this.udfSemantics == null) {
+ if (this.udfSemantics == null || this.analyzedUdfSemantics) {
// extract semantic properties from function annotations
- this.udfSemantics = extractSemanticAnnotationsFromUdf(getFunction().getClass());
+ setSemanticProperties(extractSemanticAnnotationsFromUdf(getFunction().getClass()));
}
- if(this.udfSemantics == null) {
- this.udfSemantics = new DualInputSemanticProperties();
+ if(this.udfSemantics == null || this.analyzedUdfSemantics) {
+ setSemanticProperties(new DualInputSemanticProperties());
SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, null, forwardedFieldsSecond,
null, null, null, null, getInput1Type(), getInput2Type(), getResultType());
} else {
@@ -390,11 +393,15 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
@Override
public DualInputSemanticProperties getSemanticProperties() {
- if (this.udfSemantics == null) {
+ if (this.udfSemantics == null || analyzedUdfSemantics) {
DualInputSemanticProperties props = extractSemanticAnnotationsFromUdf(getFunction().getClass());
- this.udfSemantics = props != null ? props : new DualInputSemanticProperties();
+ if (props != null) {
+ setSemanticProperties(props);
+ }
+ }
+ if (this.udfSemantics == null) {
+ setSemanticProperties(new DualInputSemanticProperties());
}
-
return this.udfSemantics;
}
@@ -408,9 +415,17 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
*/
public void setSemanticProperties(DualInputSemanticProperties properties) {
this.udfSemantics = properties;
+ this.analyzedUdfSemantics = false;
}
-
-
+
+ protected boolean getAnalyzedUdfSemanticsFlag() {
+ return this.analyzedUdfSemantics;
+ }
+
+ protected void setAnalyzedUdfSemanticsFlag() {
+ this.analyzedUdfSemantics = true;
+ }
+
protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
Set<Annotation> annotations = FunctionAnnotation.readDualForwardAnnotations(udfClass);
return SemanticPropUtil.getSemanticPropsDual(annotations, getInput1Type(), getInput2Type(), getResultType());
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
index 026cc61..924c84f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
@@ -57,7 +57,6 @@ public interface UdfOperator<O extends UdfOperator<O>> {
/**
* Gets the semantic properties that have been set for the user-defined functions (UDF).
- * This method may return null, if no semantic properties have been set so far.
*
* @return The semantic properties of the UDF.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
new file mode 100644
index 0000000..52a0d08
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.api.common.CodeAnalysisMode;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.sca.CodeAnalyzerException;
+import org.apache.flink.api.java.sca.UdfAnalyzer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class UdfOperatorUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UdfOperatorUtils.class);
+
+ public static void analyzeSingleInputUdf(SingleInputUdfOperator<?, ?, ?> operator, Class<?> udfBaseClass,
+ String defaultName, Function udf, Keys<?> key) {
+ final CodeAnalysisMode mode = operator.getExecutionEnvironment().getConfig().getCodeAnalysisMode();
+ if (mode != CodeAnalysisMode.DISABLE
+ && !udf.getClass().isAnnotationPresent(FunctionAnnotation.SkipCodeAnalysis.class)) {
+ final String operatorName = operator.getName() != null ? operator.getName()
+ : udfBaseClass.getSimpleName() + " at "+defaultName;
+ try {
+ final UdfAnalyzer analyzer = new UdfAnalyzer(udfBaseClass, udf.getClass(), operatorName, operator.getInputType(), null,
+ operator.getResultType(), key, null, mode == CodeAnalysisMode.OPTIMIZE);
+ final boolean success = analyzer.analyze();
+ if (success) {
+ if (mode == CodeAnalysisMode.OPTIMIZE
+ && !operator.udfWithForwardedFieldsAnnotation(udf.getClass())) {
+ analyzer.addSemanticPropertiesHints();
+ operator.setSemanticProperties((SingleInputSemanticProperties) analyzer.getSemanticProperties());
+ operator.setAnalyzedUdfSemanticsFlag();
+ }
+ else if (mode == CodeAnalysisMode.HINT) {
+ analyzer.addSemanticPropertiesHints();
+ }
+ analyzer.printToLogger(LOG);
+ }
+ }
+ catch (InvalidTypesException e) {
+ LOG.debug("Unable to do code analysis due to missing type information.", e);
+ }
+ catch (CodeAnalyzerException e) {
+ LOG.debug("Code analysis failed.", e);
+ }
+ }
+ }
+
+ public static void analyzeDualInputUdf(TwoInputUdfOperator<?, ?, ?, ?> operator, Class<?> udfBaseClass,
+ String defaultName, Function udf, Keys<?> key1, Keys<?> key2) {
+ final CodeAnalysisMode mode = operator.getExecutionEnvironment().getConfig().getCodeAnalysisMode();
+ if (mode != CodeAnalysisMode.DISABLE
+ && !udf.getClass().isAnnotationPresent(FunctionAnnotation.SkipCodeAnalysis.class)) {
+ final String operatorName = operator.getName() != null ? operator.getName()
+ : udfBaseClass.getSimpleName() + " at " + defaultName;
+ try {
+ final UdfAnalyzer analyzer = new UdfAnalyzer(udfBaseClass, udf.getClass(), operatorName, operator.getInput1Type(),
+ operator.getInput2Type(), operator.getResultType(), key1, key2,
+ mode == CodeAnalysisMode.OPTIMIZE);
+ final boolean success = analyzer.analyze();
+ if (success) {
+ if (mode == CodeAnalysisMode.OPTIMIZE
+ && !(operator.udfWithForwardedFieldsFirstAnnotation(udf.getClass())
+ || operator.udfWithForwardedFieldsSecondAnnotation(udf.getClass()))) {
+ analyzer.addSemanticPropertiesHints();
+ operator.setSemanticProperties((DualInputSemanticProperties) analyzer.getSemanticProperties());
+ operator.setAnalyzedUdfSemanticsFlag();
+ }
+ else if (mode == CodeAnalysisMode.HINT) {
+ analyzer.addSemanticPropertiesHints();
+ }
+ analyzer.printToLogger(LOG);
+ }
+ }
+ catch (InvalidTypesException e) {
+ LOG.debug("Unable to do code analysis due to missing type information.", e);
+ }
+ catch (CodeAnalyzerException e) {
+ LOG.debug("Code analysis failed.", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeAnalyzerException.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeAnalyzerException.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeAnalyzerException.java
new file mode 100644
index 0000000..e42ae67
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeAnalyzerException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.sca;
+
+/**
+ * Exception that is thrown if code analysis could not run properly.
+ */
+public class CodeAnalyzerException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public CodeAnalyzerException() {
+ super();
+ }
+
+ public CodeAnalyzerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CodeAnalyzerException(String message) {
+ super(message);
+ }
+
+ public CodeAnalyzerException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeErrorException.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeErrorException.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeErrorException.java
new file mode 100644
index 0000000..9afe5d8
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeErrorException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.sca;
+
+/**
+ * Exception that is thrown if code errors could be found during analysis.
+ */
+public class CodeErrorException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public CodeErrorException() {
+ super();
+ }
+
+ public CodeErrorException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CodeErrorException(String message) {
+ super(message);
+ }
+
+ public CodeErrorException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java
new file mode 100644
index 0000000..4c0d020
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.objectweb.asm.tree.AbstractInsnNode;
+import org.objectweb.asm.tree.InsnList;
+import org.objectweb.asm.tree.JumpInsnNode;
+import org.objectweb.asm.tree.analysis.Analyzer;
+import org.objectweb.asm.tree.analysis.Frame;
+import org.objectweb.asm.tree.analysis.Interpreter;
+
+import java.lang.reflect.Field;
+
+/**
+ * Modified version of ASMs Analyzer. It defines a custom ASM Frame
+ * and allows jump modification which is necessary for UDFs with
+ * one iterable input e.g. GroupReduce.
+ * (see also UdfAnalyzer's field "iteratorTrueAssumptionApplied")
+ */
+public class ModifiedASMAnalyzer extends Analyzer {
+
+ private NestedMethodAnalyzer interpreter;
+
+ public ModifiedASMAnalyzer(Interpreter interpreter) {
+ super(interpreter);
+ this.interpreter = (NestedMethodAnalyzer) interpreter;
+ }
+
+ protected Frame newFrame(int nLocals, int nStack) {
+ return new ModifiedASMFrame(nLocals, nStack);
+ }
+
+ protected Frame newFrame(Frame src) {
+ return new ModifiedASMFrame(src);
+ }
+
+ // type of jump modification
+ private int jumpModification = NO_MOD;
+ private static final int NO_MOD = -1;
+ private static final int IFEQ_MOD = 0;
+ private static final int IFNE_MOD = 1;
+ private int eventInsn;
+
+ // current state of modification
+ private int jumpModificationState = DO_NOTHING;
+ private static final int DO_NOTHING = -1;
+ private static final int PRE_STATE = 0;
+ private static final int MOD_STATE = 1;
+ private static final int WAIT_FOR_INSN_STATE = 2;
+
+ public void requestIFEQLoopModification() {
+ if (jumpModificationState != DO_NOTHING) {
+ throw new CodeAnalyzerException("Unable to do jump modifications (unsupported nested jumping).");
+ }
+ jumpModification = IFEQ_MOD;
+ jumpModificationState = PRE_STATE;
+ }
+
+ public void requestIFNELoopModification() {
+ if (jumpModificationState != DO_NOTHING) {
+ throw new CodeAnalyzerException("Unable to do jump modifications (unsupported nested jumping).");
+ }
+ jumpModification = IFNE_MOD;
+ jumpModificationState = PRE_STATE;
+ }
+
+ @Override
+ protected void newControlFlowEdge(int insn, int successor) {
+ try {
+ if (jumpModificationState == PRE_STATE) {
+ jumpModificationState = MOD_STATE;
+ }
+ else if (jumpModificationState == MOD_STATE) {
+ // this modification swaps the top 2 values on the queue stack
+ // it ensures that the "TRUE" path will be executed first, which is important
+ // for a later merge
+ if (jumpModification == IFEQ_MOD) {
+ final int top = accessField(Analyzer.class, "top").getInt(this);
+ final int[] queue = (int[]) accessField(Analyzer.class, "queue").get(this);
+
+ final int tmp = queue[top - 2];
+ queue[top - 2] = queue[top - 1];
+ queue[top - 1] = tmp;
+
+ eventInsn = queue[top - 2] - 1;
+ final InsnList insns = (InsnList) accessField(Analyzer.class, "insns").get(this);
+ // check if instruction is a goto instruction
+ // if yes this is loop structure
+ if (insns.get(eventInsn) instanceof JumpInsnNode) {
+ jumpModificationState = WAIT_FOR_INSN_STATE;
+ }
+ // no loop -> end of modification
+ else {
+ jumpModificationState = DO_NOTHING;
+ }
+ }
+ // this modification changes the merge priority of certain frames (the expression part of the IF)
+ else if (jumpModification == IFNE_MOD) {
+ final Frame[] frames = (Frame[]) accessField(Analyzer.class, "frames").get(this);
+ final Field indexField = accessField(AbstractInsnNode.class, "index");
+
+ final InsnList insns = (InsnList) accessField(Analyzer.class, "insns").get(this);
+ final AbstractInsnNode gotoInsnn = insns.get(successor - 1);
+ // check for a loop
+ if (gotoInsnn instanceof JumpInsnNode) {
+ jumpModificationState = WAIT_FOR_INSN_STATE;
+ // sets a merge priority for all instructions (the expression of the IF)
+ // from the label the goto instruction points to until the evaluation with IFEQ
+ final int idx = indexField.getInt(accessField(JumpInsnNode.class, "label").get(gotoInsnn));
+
+ for (int i=idx; i <= insn; i++) {
+ ((ModifiedASMFrame) frames[i]).mergePriority = true;
+ }
+ eventInsn = idx - 2;
+ }
+ else {
+ jumpModificationState = DO_NOTHING;
+ }
+ }
+ }
+ // wait for the goto instruction
+ else if (jumpModificationState == WAIT_FOR_INSN_STATE && insn == eventInsn) {
+ jumpModificationState = DO_NOTHING;
+ final Frame[] frames = (Frame[]) accessField(Analyzer.class, "frames").get(this);
+ // merge the goto instruction frame with the frame that follows
+ // this ensures that local variables are kept
+ if (jumpModification == IFEQ_MOD) {
+ interpreter.rightMergePriority = true;
+ final Field top = accessField(Frame.class, "top");
+ top.setInt(frames[eventInsn], top.getInt(frames[eventInsn + 1]));
+ frames[eventInsn + 1].merge(frames[eventInsn], interpreter);
+ }
+ // finally set a merge priority for the last instruction of the loop (before the IF expression)
+ else if (jumpModification == IFNE_MOD) {
+ ((ModifiedASMFrame) frames[eventInsn + 1]).mergePriority = true;
+ }
+ }
+ }
+ catch (Exception e) {
+ throw new CodeAnalyzerException("Unable to do jump modifications.", e);
+ }
+ }
+
+ private Field accessField(Class<?> clazz, String name) {
+ for (Field f : clazz.getDeclaredFields()) {
+ if (f.getName().equals(name)) {
+ f.setAccessible(true);
+ return f;
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java
new file mode 100644
index 0000000..497a15c
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.objectweb.asm.tree.AbstractInsnNode;
+import org.objectweb.asm.tree.analysis.AnalyzerException;
+import org.objectweb.asm.tree.analysis.Frame;
+import org.objectweb.asm.tree.analysis.Interpreter;
+import org.objectweb.asm.tree.analysis.Value;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+/**
+ * Modified version of ASMs Frame. It allows to perform different merge
+ * priorities and passes frame information to the Interpreter.
+ */
+public class ModifiedASMFrame extends Frame {
+
+ public boolean mergePriority;
+
+ public ModifiedASMFrame(int nLocals, int nStack) {
+ super(nLocals, nStack);
+ }
+ public ModifiedASMFrame(Frame src) {
+ super(src);
+ }
+
+ @Override
+ public Frame init(Frame src) {
+ mergePriority = ((ModifiedASMFrame)src).mergePriority;
+ return super.init(src);
+ }
+
+ @Override
+ public void execute(AbstractInsnNode insn, Interpreter interpreter)
+ throws AnalyzerException {
+ NestedMethodAnalyzer nma = ((NestedMethodAnalyzer) interpreter);
+ nma.currentFrame = (ModifiedASMFrame) this;
+ super.execute(insn, interpreter);
+ }
+
+ @Override
+ public boolean merge(Frame frame, Interpreter interpreter) throws AnalyzerException {
+ if (((ModifiedASMFrame)frame).mergePriority) {
+ ((NestedMethodAnalyzer)interpreter).rightMergePriority = true;
+ }
+ final boolean result = super.merge(frame, interpreter);
+ ((NestedMethodAnalyzer)interpreter).rightMergePriority = false;
+ ((ModifiedASMFrame)frame).mergePriority = false;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ // FOR DEBUGGING
+ try {
+ Class<?> frame = Frame.class;
+ Field valuesField = frame.getDeclaredField("values");
+ valuesField.setAccessible(true);
+ Value[] newValues = (Value[]) valuesField.get(this);
+ return Arrays.toString(newValues);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file