You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/06/08 07:33:50 UTC
[1/3] flink git commit: [FLINK-1319] [core] Add static code analysis
for user code
Repository: flink
Updated Branches:
refs/heads/master d433ba9f0 -> c854d5260
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
new file mode 100644
index 0000000..a1d2b97
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
@@ -0,0 +1,707 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsDualInput;
+import static org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsDualInputWithKeys;
+import static org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsSingleInput;
+import static org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsSingleInputWithKeys;
+
+/**
+ * This class contains some more advanced tests based on examples from "real world".
+ * The examples are not complete copies. They are modified at some points to reduce
+ * dependencies to other classes.
+ */
+@SuppressWarnings("serial")
+public class UdfAnalyzerExamplesTest {
+
+ // --------------------------------------------------------------------------------------------
+ // EnumTriangles
+ // --------------------------------------------------------------------------------------------
+
+ public static class Edge extends Tuple2<Integer, Integer> {
+ private static final long serialVersionUID = 1L;
+
+ public static final int V1 = 0;
+ public static final int V2 = 1;
+
+ public Edge() {}
+
+ public Edge(final Integer v1, final Integer v2) {
+ this.setFirstVertex(v1);
+ this.setSecondVertex(v2);
+ }
+
+ public Integer getFirstVertex() { return this.getField(V1); }
+
+ public Integer getSecondVertex() { return this.getField(V2); }
+
+ public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); }
+
+ public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); }
+
+ public void copyVerticesFromTuple2(Tuple2<Integer, Integer> t) {
+ this.setFirstVertex(t.f0);
+ this.setSecondVertex(t.f1);
+ }
+
+ public void flipVertices() {
+ Integer tmp = this.getFirstVertex();
+ this.setFirstVertex(this.getSecondVertex());
+ this.setSecondVertex(tmp);
+ }
+ }
+
+ public static class Triad extends Tuple3<Integer, Integer, Integer> {
+ private static final long serialVersionUID = 1L;
+
+ public static final int V1 = 0;
+ public static final int V2 = 1;
+ public static final int V3 = 2;
+
+ public Triad() {}
+
+ public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); }
+
+ public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); }
+
+ public void setThirdVertex(final Integer vertex3) { this.setField(vertex3, V3); }
+ }
+
+ @ForwardedFields("0")
+ private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> {
+ private final List<Integer> vertices = new ArrayList<Integer>();
+ private final Triad outTriad = new Triad();
+
+ @Override
+ public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception {
+
+ final Iterator<Edge> edges = edgesIter.iterator();
+
+ // clear vertex list
+ vertices.clear();
+
+ // read first edge
+ Edge firstEdge = edges.next();
+ outTriad.setFirstVertex(firstEdge.getFirstVertex());
+
+ vertices.add(firstEdge.getSecondVertex());
+
+ // build and emit triads
+ while (edges.hasNext()) {
+ Integer higherVertexId = edges.next().getSecondVertex();
+
+ // combine vertex with all previously read vertices
+ for (Integer lowerVertexId : vertices) {
+ outTriad.setSecondVertex(lowerVertexId);
+ outTriad.setThirdVertex(higherVertexId);
+ out.collect(outTriad);
+ }
+ vertices.add(higherVertexId);
+ }
+ }
+ }
+
+ @Test
+ public void testEnumTrianglesBasicExamplesTriadBuilder() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, TriadBuilder.class,
+ "Tuple2<Integer, Integer>",
+ "Tuple3<Integer, Integer, Integer>",
+ new String[] { "0" });
+ }
+
+ @ForwardedFields("0;1")
+ public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> {
+ private final Edge outEdge = new Edge();
+
+ @Override
+ public Edge map(Tuple2<Integer, Integer> t) throws Exception {
+ outEdge.copyVerticesFromTuple2(t);
+ return outEdge;
+ }
+ }
+
+ @Test
+ public void testEnumTrianglesBasicExamplesTupleEdgeConverter() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, TupleEdgeConverter.class,
+ "Tuple2<Integer, Integer>",
+ "Tuple2<Integer, Integer>");
+ }
+
+ private static class EdgeDuplicator implements FlatMapFunction<Edge, Edge> {
+ @Override
+ public void flatMap(Edge edge, Collector<Edge> out) throws Exception {
+ out.collect(edge);
+ edge.flipVertices();
+ out.collect(edge);
+ }
+ }
+
+ @Test
+ public void testEnumTrianglesOptExamplesEdgeDuplicator() {
+ compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, EdgeDuplicator.class,
+ "Tuple2<Integer, Integer>",
+ "Tuple2<Integer, Integer>");
+ }
+
+ private static class DegreeCounter implements GroupReduceFunction<Edge, Edge> {
+ final ArrayList<Integer> otherVertices = new ArrayList<Integer>();
+ final Edge outputEdge = new Edge();
+
+ @Override
+ public void reduce(Iterable<Edge> edgesIter, Collector<Edge> out) {
+
+ Iterator<Edge> edges = edgesIter.iterator();
+ otherVertices.clear();
+
+ // get first edge
+ Edge edge = edges.next();
+ Integer groupVertex = edge.getFirstVertex();
+ this.otherVertices.add(edge.getSecondVertex());
+
+ // get all other edges (assumes edges are sorted by second vertex)
+ while (edges.hasNext()) {
+ edge = edges.next();
+ Integer otherVertex = edge.getSecondVertex();
+ // collect unique vertices
+ if(!otherVertices.contains(otherVertex) && otherVertex != groupVertex) {
+ this.otherVertices.add(otherVertex);
+ }
+ }
+
+ // emit edges
+ for(Integer otherVertex : this.otherVertices) {
+ if(groupVertex < otherVertex) {
+ outputEdge.setFirstVertex(groupVertex);
+ outputEdge.setSecondVertex(otherVertex);
+ } else {
+ outputEdge.setFirstVertex(otherVertex);
+ outputEdge.setSecondVertex(groupVertex);
+ }
+ out.collect(outputEdge);
+ }
+ }
+ }
+
+ @Test
+ public void testEnumTrianglesOptExamplesDegreeCounter() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, DegreeCounter.class,
+ "Tuple2<Integer, Integer>",
+ "Tuple2<Integer, Integer>",
+ new String[] { "0" });
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // KMeans
+ // --------------------------------------------------------------------------------------------
+
+ public static class Point implements Serializable {
+ public double x, y;
+
+ public Point() {}
+
+ public Point(double x, double y) {
+ this.x = x;
+ this.y = y;
+ }
+
+ public Point add(Point other) {
+ x += other.x;
+ y += other.y;
+ return this;
+ }
+
+ public Point div(long val) {
+ x /= val;
+ y /= val;
+ return this;
+ }
+
+ public void clear() {
+ x = y = 0.0;
+ }
+
+ @Override
+ public String toString() {
+ return x + " " + y;
+ }
+ }
+
+ public static class Centroid extends Point {
+ public int id;
+
+ public Centroid() {}
+
+ public Centroid(int id, double x, double y) {
+ super(x,y);
+ this.id = id;
+ }
+
+ public Centroid(int id, Point p) {
+ super(p.x, p.y);
+ this.id = id;
+ }
+
+ @Override
+ public String toString() {
+ return id + " " + super.toString();
+ }
+ }
+
+ @ForwardedFields("0")
+ public static final class CentroidAccumulator implements ReduceFunction<Tuple3<Integer, Point, Long>> {
+ @Override
+ public Tuple3<Integer, Point, Long> reduce(Tuple3<Integer, Point, Long> val1, Tuple3<Integer, Point, Long> val2) {
+ return new Tuple3<Integer, Point, Long>(val1.f0, val1.f1.add(val2.f1), val1.f2 + val2.f2);
+ }
+ }
+
+ @Test
+ public void testKMeansExamplesCentroidAccumulator() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, CentroidAccumulator.class,
+ "Tuple3<Integer, org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Point<x=double,y=double>, Long>",
+ "Tuple3<Integer, org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Point<x=double,y=double>, Long>",
+ new String[] { "0" });
+ }
+
+ @ForwardedFields("0->id")
+ public static final class CentroidAverager implements MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
+ @Override
+ public Centroid map(Tuple3<Integer, Point, Long> value) {
+ return new Centroid(value.f0, value.f1.div(value.f2));
+ }
+ }
+
+ @Test
+ public void testKMeansExamplesCentroidAverager() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, CentroidAverager.class,
+ "Tuple3<Integer, org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Point<x=double,y=double>, Long>",
+ "org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Centroid<x=double,y=double,id=int>");
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // ConnectedComponents
+ // --------------------------------------------------------------------------------------------
+
+ public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
+ invertedEdge.f0 = edge.f1;
+ invertedEdge.f1 = edge.f0;
+ out.collect(edge);
+ out.collect(invertedEdge);
+ }
+ }
+
+ @Test
+ public void testConnectedComponentsExamplesUndirectEdge() {
+ compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, UndirectEdge.class,
+ "Tuple2<Long, Long>",
+ "Tuple2<Long, Long>");
+ }
+
+ @ForwardedFieldsFirst("*")
+ public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ @Override
+ public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) {
+ if (candidate.f1 < old.f1) {
+ out.collect(candidate);
+ }
+ }
+ }
+
+ @Test
+ public void testConnectedComponentsExamplesComponentIdFilter() {
+ compareAnalyzerResultWithAnnotationsDualInput(FlatJoinFunction.class, ComponentIdFilter.class,
+ "Tuple2<Long, Long>",
+ "Tuple2<Long, Long>",
+ "Tuple2<Long, Long>");
+ }
+
+ @ForwardedFields("*->f0;*->f1")
+ public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
+ @Override
+ public Tuple2<T, T> map(T vertex) {
+ return new Tuple2<T, T>(vertex, vertex);
+ }
+ }
+
+ @Test
+ public void testConnectedComponentsExamplesDuplicateValue() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, DuplicateValue.class,
+ "Long",
+ "Tuple2<Long, Long>");
+ }
+
+ @ForwardedFieldsFirst("f1->f1")
+ @ForwardedFieldsSecond("f1->f0")
+ public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ @Override
+ public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
+ return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
+ }
+ }
+
+ @Test
+ public void testConnectedComponentsExamplesNeighborWithComponentIDJoin() {
+ compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, NeighborWithComponentIDJoin.class,
+ "Tuple2<Long, Long>",
+ "Tuple2<Long, Long>",
+ "Tuple2<Long, Long>");
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // WebLogAnalysis
+ // --------------------------------------------------------------------------------------------
+
+ @ForwardedFieldsFirst("f1")
+ public static class AntiJoinVisits implements CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, Tuple3<Integer, String, Integer>> {
+ @Override
+ public void coGroup(Iterable<Tuple3<Integer, String, Integer>> ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) {
+ // Check if there is a entry in the visits relation
+ if (!visits.iterator().hasNext()) {
+ for (Tuple3<Integer, String, Integer> next : ranks) {
+ // Emit all rank pairs
+ out.collect(next);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testWebLogAnalysisExamplesAntiJoinVisits() {
+ compareAnalyzerResultWithAnnotationsDualInputWithKeys(CoGroupFunction.class, AntiJoinVisits.class,
+ "Tuple3<Integer, String, Integer>",
+ "Tuple1<String>",
+ "Tuple3<Integer, String, Integer>",
+ new String[] { "1" }, new String[] { "0" });
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // PageRankBasic
+ // --------------------------------------------------------------------------------------------
+
+ @ForwardedFields("0")
+ public static class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
+ private final ArrayList<Long> neighbors = new ArrayList<Long>();
+
+ @Override
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
+ neighbors.clear();
+ Long id = 0L;
+
+ for (Tuple2<Long, Long> n : values) {
+ id = n.f0;
+ neighbors.add(n.f1);
+ }
+ out.collect(new Tuple2<Long, Long[]>(id, neighbors.toArray(new Long[neighbors.size()])));
+ }
+ }
+
+ @Test
+ public void testPageRankBasicExamplesBuildOutgoingEdgeList() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, BuildOutgoingEdgeList.class,
+ "Tuple2<Long, Long>",
+ "Tuple2<Long, Long[]>",
+ new String[] { "0" });
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // LogisticRegression
+ // --------------------------------------------------------------------------------------------
+
+ public static class Vector extends Tuple1<double[]> {
+ public Vector() {
+ // default constructor needed for instantiation during serialization
+ }
+
+ public Vector(int size) {
+ double[] components = new double[size];
+ for (int i = 0; i < size; i++) {
+ components[i] = 0.0;
+ }
+ setComponents(components);
+ }
+
+ public double[] getComponents() {
+ return this.f0;
+ }
+
+ public double getComponent(int i) {
+ return this.f0[i];
+ }
+
+ public void setComponent(int i, double value) {
+ this.f0[i] = value;
+ }
+
+ public void setComponents(double[] components) {
+ this.f0 = components;
+ }
+ }
+
+ public static class Gradient extends Vector {
+ public Gradient() {
+ // default constructor needed for instantiation during serialization
+ }
+
+ public Gradient(int size) {
+ super(size);
+ }
+ }
+
+ public static class PointWithLabel extends Tuple2<Integer, double[]> {
+ public double[] getFeatures() {
+ return this.f1;
+ }
+
+ public double getFeature(int i) {
+ return this.f1[i];
+ }
+
+ public void setFeatures(double[] features) {
+ this.f1 = features;
+ }
+
+ public Integer getLabel() {
+ return this.f0;
+ }
+
+ public void setLabel(Integer label) {
+ this.f0 = label;
+ }
+ }
+
+ public static class SumGradient implements ReduceFunction<Gradient> {
+ @Override
+ public Gradient reduce(Gradient gradient1, Gradient gradient2) throws Exception {
+ // grad(i) +=
+ for (int i = 0; i < gradient1.getComponents().length; i++) {
+ gradient1.setComponent(i, gradient1.getComponent(i) + gradient2.getComponent(i));
+ }
+
+ return gradient1;
+ }
+ }
+
+ @Test
+ public void testLogisticRegressionExamplesSumGradient() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, SumGradient.class,
+ "Tuple1<double[]>",
+ "Tuple1<double[]>",
+ new String[] { "0" });
+ }
+
+ public static class PointParser implements MapFunction<String, PointWithLabel> {
+ @Override
+ public PointWithLabel map(String value) throws Exception {
+ PointWithLabel p = new PointWithLabel();
+
+ String[] split = value.split(",");
+ double[] features = new double[42];
+
+ int a = 0;
+ for (int i = 0; i < split.length; i++) {
+
+ if (i == 42 - 1) {
+ p.setLabel(new Integer(split[i].trim().substring(0, 1)));
+ } else {
+ if (a < 42 && split[i].trim() != "") {
+ features[a++] = Double.parseDouble(split[i].trim());
+ }
+ }
+ }
+
+ p.setFeatures(features);
+ return p;
+ }
+ }
+
+ @Test
+ public void testLogisticRegressionExamplesPointParser() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, PointParser.class,
+ "String",
+ "Tuple2<Integer, double[]>");
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Canopy
+ // --------------------------------------------------------------------------------------------
+
+ public static class Document extends Tuple5<Integer, Boolean, Boolean, String, String> {
+ public Document() {
+ // default constructor needed for instantiation during serialization
+ }
+
+ public Document(Integer docId, Boolean isCenter, Boolean isInSomeT2, String canopyCenters, String words) {
+ super(docId, isCenter, isInSomeT2, canopyCenters, words);
+ }
+
+ public Document(Integer docId) {
+ super(docId, null, null, null, null);
+ }
+ }
+
+ public static class MessageBOW implements FlatMapFunction<String, Tuple2<Integer, String>> {
+ @Override
+ public void flatMap(String value, Collector<Tuple2<Integer, String>> out) throws Exception {
+ String[] splits = value.split(" ");
+ if (splits.length < 2) {
+ return;
+ }
+ out.collect(new Tuple2<Integer, String>(Integer.valueOf(splits[0]), splits[1]));
+ }
+ }
+
+ @Test
+ public void testCanopyExamplesMassageBOW() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, PointParser.class,
+ "String",
+ "Tuple2<Integer, String>");
+ }
+
+ @ForwardedFields("0")
+ public static class DocumentReducer implements GroupReduceFunction<Tuple2<Integer, String>, Document> {
+ @Override
+ public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Document> out) throws Exception {
+ Iterator<Tuple2<Integer, String>> it = values.iterator();
+ Tuple2<Integer, String> first = it.next();
+ Integer docId = first.f0;
+ StringBuilder builder = new StringBuilder(first.f1);
+ while (it.hasNext()) {
+ builder.append("-").append(it.next().f1);
+ }
+ out.collect(new Document(docId, false, false, "", builder.toString()));
+ }
+ }
+
+ @Test
+ public void testCanopyExamplesDocumentReducer() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, DocumentReducer.class,
+ "Tuple2<Integer, String>",
+ "Tuple5<Integer, Boolean, Boolean, String, String>",
+ new String[] { "0" });
+ }
+
+ @ForwardedFields("0;4")
+ public static class MapToCenter implements MapFunction<Document, Document> {
+ private Document center;
+
+ @Override
+ public Document map(Document value) throws Exception {
+ if (center != null) {
+ final float similarity = 42f;
+ final boolean isEqual = value.f0.equals(center.f0);
+ value.f1 = isEqual;
+ value.f2 = isEqual || similarity > 42;
+ if (!value.f3.contains(center.f0.toString() + ";") && (similarity > 42 || isEqual)) {
+ value.f3 += center.f0.toString() + ";";
+ }
+ }
+ return value;
+ }
+ }
+
+ @Test
+ public void testCanopyExamplesMapToCenter() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, MapToCenter.class,
+ "Tuple5<Integer, Boolean, Boolean, String, String>",
+ "Tuple5<Integer, Boolean, Boolean, String, String>");
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // K-Meanspp
+ // --------------------------------------------------------------------------------------------
+
+ public static class DocumentWithFreq implements Serializable {
+ private static final long serialVersionUID = -8646398807053061675L;
+
+ public Map<String, Double> wordFreq = new HashMap<String, Double>();
+
+ public Integer id;
+
+ public DocumentWithFreq() {
+ id = -1;
+ }
+
+ public DocumentWithFreq(Integer id) {
+ this.id = id;
+ }
+
+ @Override
+ public String toString() {
+ return Integer.toString(id);
+ }
+ }
+
+ @ForwardedFields("0->id")
+ public static final class RecordToDocConverter implements GroupReduceFunction<Tuple3<Integer, Integer, Double>, DocumentWithFreq> {
+ private static final long serialVersionUID = -8476366121490468956L;
+
+ @Override
+ public void reduce(Iterable<Tuple3<Integer, Integer, Double>> values, Collector<DocumentWithFreq> out) throws Exception {
+ Iterator<Tuple3<Integer, Integer, Double>> it = values.iterator();
+ if (it.hasNext()) {
+ Tuple3<Integer, Integer, Double> elem = it.next();
+ DocumentWithFreq doc = new DocumentWithFreq(elem.f0);
+ doc.wordFreq.put(elem.f1.toString(), elem.f2);
+
+ while (it.hasNext()) {
+ elem = it.next();
+ doc.wordFreq.put(elem.f1.toString(), elem.f2);
+ }
+ out.collect(doc);
+ }
+ }
+ }
+
+ @Test
+ public void testKMeansppExamplesRecordToDocConverter() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, RecordToDocConverter.class,
+ "Tuple3<Integer, Integer, Double>",
+ "org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$DocumentWithFreq<id=Integer,wordFreq=java.util.HashMap>",
+ new String[] { "0" });
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
new file mode 100644
index 0000000..8d8f801
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
@@ -0,0 +1,1353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.annotation.Annotation;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings("serial")
+public class UdfAnalyzerTest {
+
+ @ForwardedFields("f0->*")
+ public static class Map1 implements MapFunction<Tuple2<String, Integer>, String> {
+ public String map(Tuple2<String, Integer> value) throws Exception {
+ return value.f0;
+ }
+ }
+
+ @Test
+ public void testSingleFieldExtract() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map1.class, "Tuple2<String,Integer>",
+ "String");
+ }
+
+ @ForwardedFields("f0->f0;f0->f1")
+ public static class Map2 implements MapFunction<Tuple2<String, Integer>, Tuple2<String, String>> {
+ public Tuple2<String, String> map(Tuple2<String, Integer> value) throws Exception {
+ return new Tuple2<String, String>(value.f0, value.f0);
+ }
+ }
+
+ @Test
+ public void testForwardIntoTuple() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map2.class, "Tuple2<String,Integer>",
+ "Tuple2<String,String>");
+ }
+
+ public static class Map3 implements MapFunction<String[], Integer> {
+ @Override
+ public Integer map(String[] value) throws Exception {
+ return value.length;
+ }
+
+ }
+
+ @Test
+ public void testForwardWithArrayAttrAccess() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map3.class, "String[]", "Integer");
+ }
+
+ public static class Map4 implements MapFunction<MyPojo, String> {
+ @Override
+ public String map(MyPojo value) throws Exception {
+ return value.field2;
+ }
+ }
+
+ @Test
+ public void testForwardWithGenericTypePublicAttrAccess() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map4.class,
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo", "String");
+ }
+
+ @ForwardedFields("field2->*")
+ public static class Map5 implements MapFunction<MyPojo, String> {
+ @Override
+ public String map(MyPojo value) throws Exception {
+ return value.field2;
+ }
+ }
+
+ @Test
+ public void testForwardWithPojoPublicAttrAccess() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map5.class,
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", "String");
+ }
+
+ @ForwardedFields("field->*")
+ public static class Map6 implements MapFunction<MyPojo, String> {
+ @Override
+ public String map(MyPojo value) throws Exception {
+ return value.field;
+ }
+ }
+
+ @Test
+ public void testForwardWithPojoPrivateAttrAccess() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map6.class,
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", "String");
+ }
+
+ @ForwardedFields("f0->f1")
+ public static class Map7 implements MapFunction<Tuple2<String, Integer>, Tuple2<String, String>> {
+ public Tuple2<String, String> map(Tuple2<String, Integer> value) throws Exception {
+ if (value.f0.equals("whatever")) {
+ return new Tuple2<String, String>(value.f0, value.f0);
+ } else {
+ return new Tuple2<String, String>("hello", value.f0);
+ }
+ }
+ }
+
+ @Test
+ public void testForwardIntoTupleWithCondition() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map7.class, "Tuple2<String,Integer>",
+ "Tuple2<String,String>");
+ }
+
+ public static class Map8 implements MapFunction<Tuple2<String, String>, String> {
+ public String map(Tuple2<String, String> value) throws Exception {
+ if (value.f0.equals("whatever")) {
+ return value.f0;
+ } else {
+ return value.f1;
+ }
+ }
+ }
+
+ @Test
+ public void testSingleFieldExtractWithCondition() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map8.class, "Tuple2<String,String>",
+ "String");
+ }
+
+ @ForwardedFields("*->f0")
+ public static class Map9 implements MapFunction<String, Tuple1<String>> {
+ private Tuple1<String> tuple = new Tuple1<String>();
+
+ public Tuple1<String> map(String value) throws Exception {
+ tuple.f0 = value;
+ return tuple;
+ }
+ }
+
+ @Test
+ public void testForwardIntoTupleWithInstanceVar() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map9.class, "String", "Tuple1<String>");
+ }
+
+ @ForwardedFields("*->f0.f0")
+ public static class Map10 implements MapFunction<String, Tuple1<Tuple1<String>>> {
+ private Tuple1<Tuple1<String>> tuple = new Tuple1<Tuple1<String>>();
+
+ public Tuple1<Tuple1<String>> map(String value) throws Exception {
+ tuple.f0.f0 = value;
+ return tuple;
+ }
+ }
+
+ @Test
+ public void testForwardIntoTupleWithInstanceVar2() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map10.class, "String",
+ "Tuple1<Tuple1<String>>");
+ }
+
+ @ForwardedFields("*->f1")
+ public static class Map11 implements MapFunction<String, Tuple2<String, String>> {
+ private Tuple2<String, String> tuple = new Tuple2<String, String>();
+
+ public Tuple2<String, String> map(String value) throws Exception {
+ tuple.f0 = value;
+ modify();
+ tuple.f1 = value;
+ return tuple;
+ }
+
+ private void modify() {
+ tuple.f0 = null;
+ }
+ }
+
+ @Test
+ public void testForwardIntoTupleWithInstanceVarChangedByOtherMethod() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map11.class, "String",
+ "Tuple2<String, String>");
+ }
+
+ @ForwardedFields("f0->f0.f0;f0->f1.f0")
+ public static class Map12 implements MapFunction<Tuple2<String, Integer>, Tuple2<Tuple1<String>, Tuple1<String>>> {
+ public Tuple2<Tuple1<String>, Tuple1<String>> map(Tuple2<String, Integer> value) throws Exception {
+ return new Tuple2<Tuple1<String>, Tuple1<String>>(new Tuple1<String>(value.f0), new Tuple1<String>(
+ value.f0));
+ }
+ }
+
+ @Test
+ public void testForwardIntoNestedTuple() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map12.class, "Tuple2<String,Integer>",
+ "Tuple2<Tuple1<String>,Tuple1<String>>");
+ }
+
+ @ForwardedFields("f0->f1.f0")
+ public static class Map13 implements MapFunction<Tuple2<String, Integer>, Tuple2<Tuple1<String>, Tuple1<String>>> {
+ @SuppressWarnings("unchecked")
+ public Tuple2<Tuple1<String>, Tuple1<String>> map(Tuple2<String, Integer> value) throws Exception {
+ Tuple2<?, ?> t = new Tuple2<Tuple1<String>, Tuple1<String>>(new Tuple1<String>(value.f0),
+ new Tuple1<String>(value.f0));
+ t.f0 = null;
+ return (Tuple2<Tuple1<String>, Tuple1<String>>) t;
+ }
+ }
+
+ @Test
+ public void testForwardIntoNestedTupleWithVarAndModification() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map13.class, "Tuple2<String,Integer>",
+ "Tuple2<Tuple1<String>,Tuple1<String>>");
+ }
+
+ @ForwardedFields("f0")
+ public static class Map14 implements MapFunction<Tuple2<String, Integer>, Tuple2<String, String>> {
+ public Tuple2<String, String> map(Tuple2<String, Integer> value) throws Exception {
+ Tuple2<String, String> t = new Tuple2<String, String>();
+ t.f0 = value.f0;
+ return t;
+ }
+ }
+
+ @Test
+ public void testForwardIntoTupleWithAssignment() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map14.class, "Tuple2<String,Integer>",
+ "Tuple2<String,String>");
+ }
+
+ @ForwardedFields("f0.f0->f0")
+ public static class Map15 implements MapFunction<Tuple2<Tuple1<String>, Integer>, Tuple2<String, String>> {
+ public Tuple2<String, String> map(Tuple2<Tuple1<String>, Integer> value) throws Exception {
+ Tuple2<String, String> t = new Tuple2<String, String>();
+ t.f0 = value.f0.f0;
+ return t;
+ }
+ }
+
+ @Test
+ public void testForwardIntoTupleWithInputPath() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map15.class,
+ "Tuple2<Tuple1<String>,Integer>", "Tuple2<String,String>");
+ }
+
+ @ForwardedFields("field->field2;field2->field")
+ public static class Map16 implements MapFunction<MyPojo, MyPojo> {
+ public MyPojo map(MyPojo value) throws Exception {
+ MyPojo p = new MyPojo();
+ p.setField(value.getField2());
+ p.setField2(value.getField());
+ return p;
+ }
+ }
+
+ @Test
+ public void testForwardIntoPojoByGettersAndSetters() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map16.class,
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>");
+ }
+
+ public static class Map17 implements MapFunction<String, Tuple1<String>> {
+ private Tuple1<String> tuple = new Tuple1<String>();
+
+ public Tuple1<String> map(String value) throws Exception {
+ if (!tuple.f0.equals("")) {
+ tuple.f0 = "empty";
+ } else {
+ tuple.f0 = value;
+ }
+ return tuple;
+ }
+ }
+
+ @Test
+ public void testForwardIntoTupleWithInstanceVarAndCondition() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map17.class, "String", "Tuple1<String>");
+ }
+
+ public static class Map18 implements MapFunction<Tuple1<String>, ArrayList<String>> {
+ private ArrayList<String> list = new ArrayList<String>();
+
+ public ArrayList<String> map(Tuple1<String> value) throws Exception {
+ list.add(value.f0);
+ return list;
+ }
+ }
+
+ @Test
+ public void testForwardIntoUnsupportedObject() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map18.class, "Tuple1<String>",
+ "java.util.ArrayList");
+ }
+
+ @ForwardedFields("*->f0")
+ public static class Map19 implements MapFunction<Integer, Tuple1<Integer>> {
+ @Override
+ public Tuple1<Integer> map(Integer value) throws Exception {
+ Tuple1<Integer> tuple = new Tuple1<Integer>();
+ tuple.f0 = value;
+ Tuple1<Integer> tuple2 = new Tuple1<Integer>();
+ tuple2.f0 = tuple.f0;
+ return tuple2;
+ }
+ }
+
+ @Test
+ public void testForwardWithNewTupleToNewTupleAssignment() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map19.class, "Integer", "Tuple1<Integer>");
+ }
+
+ @ForwardedFields("f0;f1")
+ public static class Map20 implements
+ MapFunction<Tuple4<Integer, Integer, Integer, Integer>, Tuple4<Integer, Integer, Integer, Integer>> {
+
+ @Override
+ public Tuple4<Integer, Integer, Integer, Integer> map(Tuple4<Integer, Integer, Integer, Integer> value)
+ throws Exception {
+ Tuple4<Integer, Integer, Integer, Integer> t = new Tuple4<Integer, Integer, Integer, Integer>();
+ t.f0 = value.getField(0);
+ t.f1 = value.getField((int) 1L);
+ return t;
+ }
+ }
+
+ @Test
+ public void testForwardWithGetMethod() {
+ System.out.println("HERE");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map20.class,
+ "Tuple4<Integer, Integer, Integer, Integer>", "Tuple4<Integer, Integer, Integer, Integer>");
+ }
+
+ @ForwardedFields("f0->f1;f1->f0")
+ public static class Map21 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+ @Override
+ public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
+ Integer i = value.f0;
+ value.setField(value.f1, 0);
+ value.setField(i, 1);
+ return value;
+ }
+ }
+
+ @Test
+ public void testForwardWithSetMethod() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map21.class, "Tuple2<Integer, Integer>",
+ "Tuple2<Integer, Integer>");
+ }
+
+ @ForwardedFields("f0->f1;f1->f0")
+ public static class Map22 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+ @Override
+ public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
+ Tuple2<Integer, Integer> t = new Tuple2<Integer, Integer>();
+ t.setField(value.f1, 0);
+ t.setField(value.getField(0), 1);
+ return t;
+ }
+ }
+
+ @Test
+ public void testForwardIntoNewTupleWithSetMethod() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map22.class, "Tuple2<Integer, Integer>",
+ "Tuple2<Integer, Integer>");
+ }
+
+ @ForwardedFields("*")
+ public static class Map23 implements MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+ @Override
+ public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
+ if (value.f0.equals(23)) {
+ return new Tuple1<Integer>(value.<Integer> getField(0));
+ } else if (value.f0.equals(22)) {
+ Tuple1<Integer> inputContainer = new Tuple1<Integer>();
+ inputContainer.f0 = value.f0;
+ return new Tuple1<Integer>(inputContainer.<Integer> getField(0));
+ } else {
+ return value;
+ }
+ }
+ }
+
+ @Test
+ public void testForwardWithGetMethod2() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map23.class, "Tuple1<Integer>",
+ "Tuple1<Integer>");
+ }
+
+ public static class Map24 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+ @Override
+ public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
+ value.setField(2, 0);
+ int i = 5;
+ value.setField(i * i + 2, 1);
+ return value;
+ }
+ }
+
+ @Test
+ public void testForwardWithSetMethod2() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map24.class, "Tuple2<Integer, Integer>",
+ "Tuple2<Integer, Integer>");
+ }
+
+ @ForwardedFields("f1->f0;f1")
+ public static class Map25 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+ @Override
+ public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
+ value.f0 = value.f1;
+ return value;
+ }
+ }
+
+ @Test
+ public void testForwardWithModifiedInput() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map25.class, "Tuple2<Integer, Integer>",
+ "Tuple2<Integer, Integer>");
+ }
+
+ @ForwardedFields("*->1")
+ public static class Map26 implements MapFunction<Integer, Tuple2<Integer, Integer>> {
+
+ @Override
+ public Tuple2<Integer, Integer> map(Integer value) throws Exception {
+ Tuple2<Integer, Integer> tuple = new Tuple2<Integer, Integer>();
+ // non-input content
+ if (tuple.equals(new Tuple2<Integer, Integer>())) {
+ tuple.f0 = 123456;
+ }
+ if (tuple.equals(new Tuple2<Integer, Integer>())) {
+ tuple.f0 = value;
+ }
+ // forwarding
+ tuple.f1 = value;
+ return tuple;
+ }
+ }
+
+ @Test
+ public void testForwardWithTuplesGetSetFieldMethods() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map26.class, "Integer",
+ "Tuple2<Integer, Integer>");
+ }
+
+ @ForwardedFields("2->3;3->7")
+ public static class Map27
+ implements
+ MapFunction<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>,
+ Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> {
+ @Override
+ public Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer> map(
+ Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer> value)
+ throws Exception {
+ Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer> tuple
+ = new Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>();
+ // non-input content
+ if (tuple.f0 == null) {
+ tuple.setField(123456, 0);
+ } else {
+ tuple.setField(value.f0, 0);
+ }
+ // forwarding
+ tuple.setField(value.f2, 3);
+ tuple.setField(value.f3, 7);
+ // TODO multiple mapping is unsupported yet
+ //tuple.setField(value.f1, 3);
+ return tuple;
+ }
+ }
+
+ @Test
+ public void testForwardWithTuplesGetSetFieldMethods2() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map27.class,
+ "Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>",
+ "Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>");
+ }
+
+ public static class Map28 implements MapFunction<Integer, Integer> {
+ @Override
+ public Integer map(Integer value) throws Exception {
+ if (value == null) {
+ value = 123;
+ }
+ return value;
+ }
+ }
+
+ @Test
+ public void testForwardWithBranching1() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map28.class, "Integer", "Integer");
+ }
+
+ @ForwardedFields("0")
+ public static class Map29 implements MapFunction<Tuple3<String, String, String>, Tuple3<String, String, String>> {
+ @Override
+ public Tuple3<String, String, String> map(Tuple3<String, String, String> value) throws Exception {
+ String tmp = value.f0;
+ for (int i = 0; i < 2; i++) {
+ value.setField("Test", i);
+ }
+ Tuple3<String, String, String> tuple;
+ if (value.f0.equals("x")) {
+ tuple = new Tuple3<String, String, String>(tmp, value.f0, null);
+ } else {
+ tuple = new Tuple3<String, String, String>(tmp, value.f0, "");
+ }
+ return tuple;
+ }
+ }
+
+ @Test
+ public void testForwardWithBranching2() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map29.class,
+ "Tuple3<String, String, String>", "Tuple3<String, String, String>");
+ }
+
+ public static class Map30 implements MapFunction<Tuple2<String, String>, String> {
+ @Override
+ public String map(Tuple2<String, String> value) throws Exception {
+ String tmp;
+ if (value.f0.equals("")) {
+ tmp = value.f0;
+ } else {
+ tmp = value.f1;
+ }
+ return tmp;
+ }
+ }
+
+ @Test
+ public void testForwardWithBranching3() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map30.class, "Tuple2<String,String>",
+ "String");
+ }
+
+ @ForwardedFields("1->1;1->0")
+ public static class Map31 implements MapFunction<Tuple2<String, String>, ExtendingTuple> {
+ @Override
+ public ExtendingTuple map(Tuple2<String, String> value) throws Exception {
+ ExtendingTuple t = new ExtendingTuple();
+ t.f1 = value.f1;
+ t.setFirstField();
+ t.f0 = t.getSecondField();
+ return t;
+ }
+ }
+
+ @Test
+ public void testForwardWithInheritance() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map31.class, "Tuple2<String,String>",
+ "Tuple2<String,String>");
+ }
+
+ @ForwardedFields("*")
+ public static class Map32
+ implements
+ MapFunction<Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>,
+ Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>> {
+
+ @Override
+ public Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double> map(
+ Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double> value) throws Exception {
+ boolean f0 = value.f0;
+ char f1 = value.f1;
+ byte f2 = value.f2;
+ short f3 = value.f3;
+ int f4 = value.f4;
+ long f5 = value.f5;
+ float f6 = value.f6;
+ double f7 = value.f7;
+ return new Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>(
+ f0, f1, f2, f3, f4, f5, f6, f7);
+ }
+ }
+
+ @Test
+ public void testForwardWithUnboxingAndBoxing() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map32.class,
+ "Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>",
+ "Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>");
+ }
+
+ public static class Map33 implements MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ @Override
+ public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
+ Tuple2<Long, Long> t = new Tuple2<Long, Long>();
+ if (value.f0 != null) {
+ t.f0 = value.f0;
+ }
+ else {
+ t.f0 = value.f1;
+ }
+ return t;
+ }
+ }
+
+ @Test
+ public void testForwardWithBranching4() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map33.class, "Tuple2<Long, Long>",
+ "Tuple2<Long, Long>");
+ }
+
+ @ForwardedFields("1")
+ public static class Map34 implements MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ private Tuple2<Long, Long> t;
+ @Override
+ public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
+ if (value != new Object()) {
+ return value;
+ }
+ else if (value.f0 == 1L && value.f1 == 2L) {
+ t = value;
+ t.f0 = 23L;
+ return t;
+ }
+ return new Tuple2<Long, Long>(value.f0, value.f1);
+ }
+ }
+
+ @Test
+ public void testForwardWithBranching5() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map34.class, "Tuple2<Long, Long>",
+ "Tuple2<Long, Long>");
+ }
+
+ public static class Map35 implements MapFunction<String[], Tuple2<String[], String[]>> {
+ @Override
+ public Tuple2<String[], String[]> map(String[] value) throws Exception {
+ String[] tmp = value;
+ value[0] = "Hello";
+ return new Tuple2<String[], String[]>(value, tmp);
+ }
+ }
+
+ @Test
+ public void testForwardWithArrayModification() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map35.class, "String[]",
+ "Tuple2<String[], String[]>");
+ }
+
+ public static class Map36 implements MapFunction<Tuple3<String, String, String>, Tuple3<String, String, String>> {
+ @Override
+ public Tuple3<String, String, String> map(Tuple3<String, String, String> value) throws Exception {
+ int i = 0;
+ do {
+ value.setField("", i);
+ i++;
+ } while (i >= 2);
+ return value;
+ }
+ }
+
+ @Test
+ public void testForwardWithBranching6() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map36.class, "Tuple3<String, String, String>",
+ "Tuple3<String, String, String>");
+ }
+
+ public static class Map37 implements MapFunction<Tuple1<Tuple1<String>>, Tuple1<Tuple1<String>>> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public Tuple1<Tuple1<String>> map(Tuple1<Tuple1<String>> value) throws Exception {
+ ((Tuple1<String>) value.getField(Integer.valueOf("2."))).f0 = "Hello";
+ return value;
+ }
+ }
+
+ @Test
+ public void testForwardWithGetAndModification() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map37.class, "Tuple1<Tuple1<String>>",
+ "Tuple1<Tuple1<String>>");
+ }
+
+ @ForwardedFields("field")
+ public static class Map38 implements MapFunction<MyPojo2, MyPojo2> {
+ @Override
+ public MyPojo2 map(MyPojo2 value) throws Exception {
+ value.setField2("test");
+ return value;
+ }
+ }
+
+ @Test
+ public void testForwardWithInheritance2() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map38.class,
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo2<field=String,field2=String>",
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo2<field=String,field2=String>");
+ }
+
+ public static class Map39 implements MapFunction<MyPojo, MyPojo> {
+ @Override
+ public MyPojo map(MyPojo value) throws Exception {
+ MyPojo mp = new MyPojo();
+ mp.field = value.field2;
+ return mp;
+ }
+ }
+
+ @Test
+ public void testForwardWithGenericTypeOutput() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map39.class,
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo");
+ }
+
+ @ForwardedFields("field2")
+ public static class Map40 implements MapFunction<MyPojo, MyPojo> {
+ @Override
+ public MyPojo map(MyPojo value) throws Exception {
+ return recursiveFunction(value);
+ }
+
+ private MyPojo recursiveFunction(MyPojo value) {
+ if (value.field == "xyz") {
+ value.field = value.field + "x";
+ return recursiveFunction(value);
+ }
+ return value;
+ }
+ }
+
+ @Test
+ public void testForwardWithRecursion() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map40.class,
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>");
+ }
+
+ @ForwardedFields("field;field2")
+ public static class Map41 extends RichMapFunction<MyPojo, MyPojo> {
+ private MyPojo field;
+ @Override
+ public MyPojo map(MyPojo value) throws Exception {
+ field = value;
+ getRuntimeContext().getIntCounter("test").getLocalValue();
+ return field;
+ }
+ }
+
+ @Test
+ public void testForwardWithGetRuntimeContext() {
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map41.class,
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>");
+ }
+
+ @ForwardedFields("*")
+ public static class FlatMap1 implements FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+ @Override
+ public void flatMap(Tuple1<Integer> value, Collector<Tuple1<Integer>> out) throws Exception {
+ out.collect(value);
+ }
+ }
+
+ @Test
+ public void testForwardWithCollector() {
+ compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap1.class, "Tuple1<Integer>",
+ "Tuple1<Integer>");
+ }
+
+ @ForwardedFields("0->1;1->0")
+ public static class FlatMap2 implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
+ invertedEdge.f0 = edge.f1;
+ invertedEdge.f1 = edge.f0;
+ out.collect(invertedEdge);
+ out.collect(invertedEdge);
+ }
+ }
+
+ @Test
+ public void testForwardWith2Collectors() {
+ compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap2.class, "Tuple2<Long, Long>",
+ "Tuple2<Long, Long>");
+ }
+
+ public static class FlatMap3 implements FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+ @Override
+ public void flatMap(Tuple1<Integer> value, Collector<Tuple1<Integer>> out) throws Exception {
+ addToCollector(out);
+ out.collect(value);
+ }
+
+ private void addToCollector(Collector<Tuple1<Integer>> out) {
+ out.collect(new Tuple1<Integer>());
+ }
+ }
+
+ @Test
+ public void testForwardWithCollectorPassing() {
+ compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap3.class, "Tuple1<Integer>",
+ "Tuple1<Integer>");
+ }
+
+ @ForwardedFieldsFirst("f1->f1")
+ @ForwardedFieldsSecond("f1->f0")
+ public static class Join1 implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ @Override
+ public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
+ return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
+ }
+ }
+
+ @Test
+ public void testForwardWithDualInput() {
+ compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, Join1.class, "Tuple2<Long, Long>",
+ "Tuple2<Long, Long>", "Tuple2<Long, Long>");
+ }
+
+ @ForwardedFieldsFirst("*")
+ public static class Join2 implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ @Override
+ public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) {
+ if (candidate.f1 < old.f1) {
+ out.collect(candidate);
+ }
+ }
+ }
+
+ @Test
+ public void testForwardWithDualInputAndCollector() {
+ compareAnalyzerResultWithAnnotationsDualInput(FlatJoinFunction.class, Join2.class, "Tuple2<Long, Long>",
+ "Tuple2<Long, Long>", "Tuple2<Long, Long>");
+ }
+
+ @ForwardedFields("0")
+ public static class GroupReduce1 implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ @Override
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(values.iterator().next());
+ }
+ }
+
+ @Test
+ public void testForwardWithIterable() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce1.class,
+ "Tuple2<Long, Long>", "Tuple2<Long, Long>", new String[] { "0" });
+ }
+
+ @ForwardedFields("1->0")
+ public static class GroupReduce2 implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ @Override
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
+ final Iterator<Tuple2<Long, Long>> it = values.iterator();
+ Tuple2<Long, Long> outTuple = new Tuple2<Long, Long>();
+ Tuple2<Long, Long> first = it.next();
+ outTuple.f0 = first.f1;
+ outTuple.f1 = first.f0;
+
+ while (it.hasNext()) {
+ Tuple2<Long, Long> t = it.next();
+ if (t.f0 == 42) {
+ outTuple.f1 += t.f0;
+ }
+ }
+ out.collect(outTuple);
+ }
+ }
+
+ @Test
+ public void testForwardWithIterable2() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce2.class,
+ "Tuple2<Long, Long>", "Tuple2<Long, Long>", new String[] { "0", "1" });
+ }
+
+ @ForwardedFields("field2")
+ public static class GroupReduce3 implements GroupReduceFunction<MyPojo, MyPojo> {
+ @Override
+ public void reduce(Iterable<MyPojo> values, Collector<MyPojo> out) throws Exception {
+ for (MyPojo value : values) {
+ out.collect(value);
+ }
+ }
+ }
+
+ @Test
+ public void testForwardWithIterable3() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce3.class,
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", new String[] { "field2" });
+ }
+
+ @ForwardedFields("f0->*")
+ public static class GroupReduce4 implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
+ @Override
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Long> out) throws Exception {
+ Long id = 0L;
+ for (Tuple2<Long, Long> value : values) {
+ id = value.f0;
+ }
+ out.collect(id);
+ }
+ }
+
+ @Test
+ public void testForwardWithAtLeastOneIterationAssumption() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce4.class,
+ "Tuple2<Long, Long>", "Long", new String[] { "f0" });
+ }
+
+ @ForwardedFields("f0->*")
+ public static class GroupReduce4_Javac implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Long> out) throws Exception {
+ Long id = 0L;
+ @SuppressWarnings("rawtypes")
+ Iterator it = values.iterator();
+ if (it.hasNext()) {
+ id = ((Tuple2<Long, Long>) it.next()).f0;
+ }
+ else {
+ System.out.println("hello world");
+ }
+ out.collect(id);
+ }
+ }
+
+ @Test
+ public void testForwardWithAtLeastOneIterationAssumptionForJavac() {
+ // this test simulates javac behaviour in Eclipse IDE
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce4_Javac.class,
+ "Tuple2<Long, Long>", "Long", new String[] { "f0" });
+ }
+
+ public static class GroupReduce5 implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
+ @Override
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Long> out) throws Exception {
+ Long id = 0L;
+ for (Tuple2<Long, Long> value : values) {
+ id = value.f0;
+ if (value != null) {
+ id = value.f1;
+ }
+ }
+ out.collect(id);
+ }
+ }
+
+ @Test
+ public void testForwardWithAtLeastOneIterationAssumption2() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce5.class,
+ "Tuple2<Long, Long>", "Long", new String[] { "f1" });
+ }
+
+ public static class GroupReduce6 implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
+ @Override
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Long> out) throws Exception {
+ Long id = 0L;
+ for (Tuple2<Long, Long> value : values) {
+ id = value.f0;
+ }
+ id = 0L;
+ out.collect(id);
+ }
+ }
+
+ @Test
+ public void testForwardWithAtLeastOneIterationAssumption3() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce6.class,
+ "Tuple2<Long, Long>", "Long", new String[] { "f0" });
+ }
+
+ public static class GroupReduce7 implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
+ @Override
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Long> out) throws Exception {
+ Long id = 0L;
+ for (Tuple2<Long, Long> value : values) {
+ id = value.f0;
+ }
+ id = 0L;
+ out.collect(id);
+ }
+ }
+
+ @Test
+ public void testForwardWithAtLeastOneIterationAssumption4() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce7.class,
+ "Tuple2<Long, Long>", "Long", new String[] { "f0" });
+ }
+
+ @ForwardedFields("f0->*")
+ public static class GroupReduce8 implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
+ @Override
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Long> out) throws Exception {
+ Long id = 0L;
+ Iterator<Tuple2<Long, Long>> it = values.iterator();
+ while (it.hasNext()) {
+ id = it.next().f0;
+ }
+ out.collect(id);
+ }
+ }
+
+ @Test
+ public void testForwardWithAtLeastOneIterationAssumption5() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce8.class,
+ "Tuple2<Long, Long>", "Long", new String[] { "f0" });
+ }
+
+ @ForwardedFields("f0")
+ public static class GroupReduce9 implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ @Override
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
+ Tuple2<Long, Long> rv = null;
+ Iterator<Tuple2<Long, Long>> it = values.iterator();
+ while (it.hasNext()) {
+ rv = it.next();
+ }
+ out.collect(rv);
+ }
+ }
+
+ @Test
+ public void testForwardWithAtLeastOneIterationAssumption6() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce9.class,
+ "Tuple2<Long, Long>", "Tuple2<Long, Long>", new String[] { "f0" });
+ }
+
+ public static class GroupReduce10 implements GroupReduceFunction<Tuple2<Long, Long>, Boolean> {
+ @Override
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Boolean> out) throws Exception {
+ Iterator<Tuple2<Long, Long>> it = values.iterator();
+ boolean f = it.hasNext();
+ if (!f) {
+ System.out.println();
+ }
+ if (f) {
+ System.out.println();
+ }
+ out.collect(f);
+ }
+ }
+
+ @Test
+ public void testForwardWithAtLeastOneIterationAssumption7() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce10.class,
+ "Tuple2<Long, Long>", "Boolean", new String[] { "f0" });
+ }
+
+ @ForwardedFields("field")
+ public static class Reduce1 implements ReduceFunction<MyPojo> {
+ @Override
+ public MyPojo reduce(MyPojo value1, MyPojo value2) throws Exception {
+ return new MyPojo(value1.getField(), value2.getField2());
+ }
+ }
+
+ @Test
+ public void testForwardWithReduce() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, Reduce1.class,
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+ new String[] { "field" });
+ }
+
+ @ForwardedFields("field")
+ public static class Reduce2 implements ReduceFunction<MyPojo> {
+ @Override
+ public MyPojo reduce(MyPojo value1, MyPojo value2) throws Exception {
+ if (value1.field == "") {
+ return value2;
+ }
+ return value1;
+ }
+ }
+
+ @Test
+ public void testForwardWithBranchingReduce() {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, Reduce2.class,
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+ "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+ new String[] { "field" });
+ }
+
+ public static class NullReturnMapper1 implements MapFunction<String, String> {
+ @Override
+ public String map(String value) throws Exception {
+ return null;
+ }
+ }
+
+ public static class NullReturnMapper2 implements MapFunction<String, String> {
+ @Override
+ public String map(String value) throws Exception {
+ if (value.equals("test")) {
+ return null;
+ }
+ return "";
+ }
+ }
+
+ public static class NullReturnFlatMapper implements FlatMapFunction<String, String> {
+ @Override
+ public void flatMap(String value, Collector<String> out) throws Exception {
+ String s = null;
+ if ("dd".equals("")) {
+ s = "";
+ }
+ out.collect(s);
+ }
+ }
+
+ @Test
+ public void testNullReturnException() {
+ try {
+ final UdfAnalyzer ua = new UdfAnalyzer(MapFunction.class, NullReturnMapper1.class, "operator",
+ BasicTypeInfo.STRING_TYPE_INFO, null, BasicTypeInfo.STRING_TYPE_INFO, null, null, true);
+ ua.analyze();
+ Assert.fail();
+ }
+ catch (CodeErrorException e) {
+ // ok
+ }
+ try {
+ final UdfAnalyzer ua = new UdfAnalyzer(MapFunction.class, NullReturnMapper2.class, "operator",
+ BasicTypeInfo.STRING_TYPE_INFO, null, BasicTypeInfo.STRING_TYPE_INFO, null, null, true);
+ ua.analyze();
+ Assert.fail();
+ }
+ catch (CodeErrorException e) {
+ // ok
+ }
+ try {
+ final UdfAnalyzer ua = new UdfAnalyzer(FlatMapFunction.class, NullReturnFlatMapper.class, "operator",
+ BasicTypeInfo.STRING_TYPE_INFO, null, BasicTypeInfo.STRING_TYPE_INFO, null, null, true);
+ ua.analyze();
+ Assert.fail();
+ }
+ catch (CodeErrorException e) {
+ // ok
+ }
+ }
+
+
+ public static class PutStaticMapper implements MapFunction<String, String> {
+ public static String test = "";
+
+ @Override
+ public String map(String value) throws Exception {
+ test = "test";
+ return "";
+ }
+ }
+
+ @Test
+ public void testPutStaticException() {
+ try {
+ final UdfAnalyzer ua = new UdfAnalyzer(MapFunction.class, PutStaticMapper.class, "operator",
+ BasicTypeInfo.STRING_TYPE_INFO, null, BasicTypeInfo.STRING_TYPE_INFO, null, null, true);
+ ua.analyze();
+ Assert.fail();
+ }
+ catch (CodeErrorException e) {
+ // ok
+ }
+ }
+
+ public static class FilterMod1 implements FilterFunction<Tuple2<String, String>> {
+
+ @Override
+ public boolean filter(Tuple2<String, String> value) throws Exception {
+ value.f0 = value.f1;
+ return false;
+ }
+ }
+
+ @Test
+ public void testFilterModificationException1() {
+ try {
+ final UdfAnalyzer ua = new UdfAnalyzer(FilterFunction.class, FilterMod1.class, "operator",
+ TypeInfoParser.parse("Tuple2<String, String>"), null, null, null, null, true);
+ ua.analyze();
+ Assert.fail();
+ }
+ catch (CodeErrorException e) {
+ // ok
+ }
+ }
+
+ public static class FilterMod2 implements FilterFunction<Tuple2<String, String>> {
+
+ @Override
+ public boolean filter(Tuple2<String, String> value) throws Exception {
+ value.f0 = "";
+ return false;
+ }
+ }
+
+ @Test
+ public void testFilterModificationException2() {
+ try {
+ final UdfAnalyzer ua = new UdfAnalyzer(FilterFunction.class, FilterMod2.class, "operator",
+ TypeInfoParser.parse("Tuple2<String, String>"), null, null, null, null, true);
+ ua.analyze();
+ Assert.fail();
+ }
+ catch (CodeErrorException e) {
+ // ok
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Utils
+ // --------------------------------------------------------------------------------------------
+
+ public static class MyPojo {
+ private String field;
+ public String field2;
+
+ public MyPojo() {
+ // default constructor
+ }
+
+ public MyPojo(String field, String field2) {
+ this.field = field;
+ this.field2 = field2;
+ }
+
+ public String getField() {
+ return field;
+ }
+
+ public void setField(String field) {
+ this.field = field;
+ }
+
+ public String getField2() {
+ return field2;
+ }
+
+ public void setField2(String field2) {
+ this.field2 = field2;
+ }
+ }
+
+ public static class MyPojo2 extends MyPojo {
+
+ public MyPojo2() {
+ // default constructor
+ }
+ }
+
+ public static class ExtendingTuple extends Tuple2<String, String> {
+ public void setFirstField() {
+ setField("Hello", 0);
+ }
+
+ public String getSecondField() {
+ return getField(1);
+ }
+ }
+
+ public static void compareAnalyzerResultWithAnnotationsSingleInput(Class<?> baseClass, Class<?> clazz, String in,
+ String out) {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(baseClass, clazz, in, out, null);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static void compareAnalyzerResultWithAnnotationsSingleInputWithKeys(Class<?> baseClass, Class<?> clazz,
+ String in, String out, String[] keys) {
+ final TypeInformation<?> inType = TypeInfoParser.parse(in);
+ final TypeInformation<?> outType = TypeInfoParser.parse(out);
+
+ // expected
+ final Set<Annotation> annotations = FunctionAnnotation.readSingleForwardAnnotations(clazz);
+ SingleInputSemanticProperties expected = SemanticPropUtil.getSemanticPropsSingle(annotations, inType,
+ outType);
+ if (expected == null) {
+ expected = new SingleInputSemanticProperties();
+ }
+
+ // actual
+ final UdfAnalyzer ua = new UdfAnalyzer(baseClass, clazz, "operator", inType, null, outType, (keys == null) ? null
+ : new Keys.ExpressionKeys(keys, inType), null, true);
+ ua.analyze();
+ final SingleInputSemanticProperties actual = (SingleInputSemanticProperties) ua.getSemanticProperties();
+
+ assertEquals(expected.toString(), actual.toString());
+ }
+
+ public static void compareAnalyzerResultWithAnnotationsDualInput(Class<?> baseClass, Class<?> clazz, String in1,
+ String in2, String out) {
+ compareAnalyzerResultWithAnnotationsDualInputWithKeys(baseClass, clazz, in1, in2, out, null, null);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static void compareAnalyzerResultWithAnnotationsDualInputWithKeys(Class<?> baseClass, Class<?> clazz,
+ String in1, String in2, String out, String[] keys1, String[] keys2) {
+ final TypeInformation<?> in1Type = TypeInfoParser.parse(in1);
+ final TypeInformation<?> in2Type = TypeInfoParser.parse(in2);
+ final TypeInformation<?> outType = TypeInfoParser.parse(out);
+
+ // expected
+ final Set<Annotation> annotations = FunctionAnnotation.readDualForwardAnnotations(clazz);
+ final DualInputSemanticProperties expected = SemanticPropUtil.getSemanticPropsDual(annotations, in1Type,
+ in2Type, outType);
+
+ // actual
+ final UdfAnalyzer ua = new UdfAnalyzer(baseClass, clazz, "operator", in1Type, in2Type, outType, (keys1 == null) ? null
+ : new Keys.ExpressionKeys(keys1, in1Type), (keys2 == null) ? null : new Keys.ExpressionKeys(
+ keys2, in2Type), true);
+ ua.analyze();
+ final DualInputSemanticProperties actual = (DualInputSemanticProperties) ua.getSemanticProperties();
+
+ assertEquals(expected.toString(), actual.toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index cd10cc6..80df0f8 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.util;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.CodeAnalysisMode;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.optimizer.DataStatistics;
@@ -29,7 +30,6 @@ import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
-
import org.junit.Assert;
public class TestEnvironment extends ExecutionEnvironment {
@@ -39,6 +39,8 @@ public class TestEnvironment extends ExecutionEnvironment {
public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
this.executor = executor;
setParallelism(parallelism);
+ // disabled to improve build time
+ getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 11a9ed1..f1e0231 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,8 +85,8 @@ under the License.
<kryoserialization.version>0.3.2</kryoserialization.version>
<protobuf.version>2.5.0</protobuf.version>
<chill.version>0.5.2</chill.version>
- <asm.version>5.0.3</asm.version>
- <tez.version>0.6.1</tez.version>
+ <asm.version>5.0.4</asm.version>
+ <tez.version>0.6.1</tez.version>
</properties>
<dependencies>
[2/3] flink git commit: [FLINK-1319] [core] Add static code analysis
for user code
Posted by uc...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
new file mode 100644
index 0000000..687e249
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
@@ -0,0 +1,730 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.findMethodNode;
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.hasImportantDependencies;
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.isTagged;
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.mergeReturnValues;
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.removeUngroupedInputs;
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.tagged;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.java.sca.TaggedValue.Tag;
+import org.objectweb.asm.Type;
+import org.objectweb.asm.tree.AbstractInsnNode;
+import org.objectweb.asm.tree.FieldInsnNode;
+import org.objectweb.asm.tree.IntInsnNode;
+import org.objectweb.asm.tree.LdcInsnNode;
+import org.objectweb.asm.tree.MethodInsnNode;
+import org.objectweb.asm.tree.MethodNode;
+import org.objectweb.asm.tree.TypeInsnNode;
+import org.objectweb.asm.tree.analysis.AnalyzerException;
+import org.objectweb.asm.tree.analysis.BasicInterpreter;
+import org.objectweb.asm.tree.analysis.BasicValue;
+
+/**
+ * Extends ASM's BasicInterpreter. Instead of ASM's BasicValues, it introduces
+ * TaggedValues which extend BasicValue and allows for appending interesting
+ * information to values. The NestedMethodAnalyzer follows interesting values
+ * and analyzes nested methods (control/method flow) if necessary.
+ */
+public class NestedMethodAnalyzer extends BasicInterpreter {
+
+ // reference to current UDF analysis context
+ private final UdfAnalyzer analyzer;
+
+ // flag that indicates if current method is still the Function's SAM method
+ private final boolean topLevelMethod;
+
+ // method description and arguments
+ private final String owner;
+ private final MethodNode methodNode;
+ private final List<BasicValue> argumentValues;
+
+ // remaining nesting level
+ private final int remainingNesting;
+
+ // ASM analyzer which analyzes this methodNode
+ private ModifiedASMAnalyzer modifiedAsmAnalyzer;
+
+ public NestedMethodAnalyzer(UdfAnalyzer analyzer, String owner, MethodNode methodNode,
+ List<BasicValue> argumentValues, int remainingNesting, boolean topLevelMethod) {
+ this.analyzer = analyzer;
+ this.topLevelMethod = topLevelMethod;
+ this.owner = owner;
+ this.methodNode = methodNode;
+ this.argumentValues = argumentValues;
+
+ this.remainingNesting = remainingNesting;
+ if (remainingNesting < 0) {
+ throw new CodeAnalyzerException("Maximum nesting level reached.");
+ }
+ }
+
+ public TaggedValue analyze() throws AnalyzerException {
+ modifiedAsmAnalyzer = new ModifiedASMAnalyzer(this);
+
+ // FOR DEBUGGING
+ // final Printer printer = new Textifier();
+ // final TraceMethodVisitor mp = new TraceMethodVisitor(printer);
+ // System.out.println(methodNode.name + " " + methodNode.desc);
+ // Iterator<AbstractInsnNode> it = methodNode.instructions.iterator();
+ // while (it.hasNext()) {
+ // it.next().accept(mp);
+ // StringWriter sw = new StringWriter();
+ // printer.print(new PrintWriter(sw));
+ // printer.getText().clear();
+ // System.out.println(sw.toString());
+ // }
+ modifiedAsmAnalyzer.analyze(owner, methodNode);
+ return mergeReturnValues(returnValues);
+ }
+
+ @SuppressWarnings("unchecked")
+ private TaggedValue invokeNestedMethod(List<? extends BasicValue> values,
+ final MethodInsnNode methodInsn) throws AnalyzerException {
+ final Object[] mn = findMethodNode(methodInsn.owner, methodInsn.name, methodInsn.desc);
+ MethodNode methodNode = (MethodNode) mn[0];
+ // recursion
+ if (methodNode.name.equals(this.methodNode.name)
+ && methodNode.desc.equals(this.methodNode.desc)) {
+ // TODO recursion are not supported perfectly yet
+ // recursion only work if the nested call follows at least one return statement
+ // return the values that are present so far
+ return mergeReturnValues(returnValues);
+ }
+ final NestedMethodAnalyzer nma = new NestedMethodAnalyzer(analyzer, (String) mn[1],
+ (MethodNode) mn[0],
+ (List<BasicValue>) values, remainingNesting -1,
+ topLevelMethod && isBridgeMethod());
+ return nma.analyze();
+ }
+
+ private boolean isBridgeMethod() {
+ return (methodNode.access & ACC_BRIDGE) == ACC_BRIDGE;
+ }
+
+ private boolean isGetRuntimeContext(MethodInsnNode methodInsnNode) {
+ return methodInsnNode.name.equals("getRuntimeContext")
+ && findMethodNode(methodInsnNode.owner, methodInsnNode.name, methodInsnNode.desc)[1]
+ .equals("org/apache/flink/api/common/functions/AbstractRichFunction");
+ }
+
+ private Type checkForUnboxing(String name, String methodOwner) {
+ // for performance improvement
+ if (!methodOwner.startsWith("java/lang/")
+ || !name.endsWith("Value")) {
+ return null;
+ }
+
+ final String actualType = methodOwner.substring(10);
+ final String convertedType = name.substring(0, name.length() - 5);
+
+ if (convertedType.equals("byte") && actualType.equals("Byte")) {
+ return Type.BYTE_TYPE;
+ }
+ else if (convertedType.equals("short") && actualType.equals("Short")) {
+ return Type.SHORT_TYPE;
+ }
+ else if (convertedType.equals("int") && actualType.equals("Integer")) {
+ return Type.INT_TYPE;
+ }
+ else if (convertedType.equals("long") && actualType.equals("Long")) {
+ return Type.LONG_TYPE;
+ }
+ else if (convertedType.equals("boolean") && actualType.equals("Boolean")) {
+ return Type.BOOLEAN_TYPE;
+ }
+ else if (convertedType.equals("char") && actualType.equals("Character") ) {
+ return Type.CHAR_TYPE;
+ }
+ else if (convertedType.equals("float") && actualType.equals("Float")) {
+ return Type.FLOAT_TYPE;
+ }
+ else if (convertedType.equals("double") && actualType.equals("Double")) {
+ return Type.DOUBLE_TYPE;
+ }
+ return null;
+ }
+
+ private Type checkForBoxing(String name, String desc, String methodOwner) {
+ // for performance improvement
+ if (!methodOwner.startsWith("java/lang/")
+ || !name.equals("valueOf")) {
+ return null;
+ }
+
+ final String convertedType = methodOwner.substring(10);
+
+ if (convertedType.equals("Byte") && desc.equals("(B)Ljava/lang/Byte;")) {
+ return Type.BYTE_TYPE;
+ }
+ else if (convertedType.equals("Short") && desc.equals("(S)Ljava/lang/Short;")) {
+ return Type.SHORT_TYPE;
+ }
+ else if (convertedType.equals("Integer") && desc.equals("(I)Ljava/lang/Integer;")) {
+ return Type.INT_TYPE;
+ }
+ else if (convertedType.equals("Long") && desc.equals("(J)Ljava/lang/Long;")) {
+ return Type.LONG_TYPE;
+ }
+ else if (convertedType.equals("Boolean") && desc.equals("(Z)Ljava/lang/Boolean;")) {
+ return Type.BOOLEAN_TYPE;
+ }
+ else if (convertedType.equals("Character") && desc.equals("(C)Ljava/lang/Character;")) {
+ return Type.CHAR_TYPE;
+ }
+ else if (convertedType.equals("Float") && desc.equals("(F)Ljava/lang/Float;")) {
+ return Type.FLOAT_TYPE;
+ }
+ else if (convertedType.equals("Double") && desc.equals("(D)Ljava/lang/Double;")) {
+ return Type.DOUBLE_TYPE;
+ }
+ return null;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Interpreter
+ // --------------------------------------------------------------------------------------------
+
+ // variable that maps the method's arguments to ASM values
+ private int curArgIndex = -1;
+ private List<TaggedValue> returnValues = new ArrayList<TaggedValue>();
+
+ // see ModifiedASMFrame
+ ModifiedASMFrame currentFrame;
+ boolean rightMergePriority;
+
+ @Override
+ public BasicValue newValue(Type type) {
+ TaggedValue tv;
+ switch (analyzer.getState()) {
+ // skip "return"
+ case UdfAnalyzer.STATE_CAPTURE_RETURN:
+ analyzer.setState(UdfAnalyzer.STATE_CAPTURE_THIS);
+ curArgIndex++;
+ return super.newValue(type);
+ // tag "this"
+ case UdfAnalyzer.STATE_CAPTURE_THIS:
+ analyzer.setState(UdfAnalyzer.STATE_CAPTURE_INPUT1);
+ tv = new TaggedValue(type, Tag.THIS);
+ curArgIndex++;
+ return tv;
+ // tag input 1
+ case UdfAnalyzer.STATE_CAPTURE_INPUT1:
+ if (analyzer.isUdfBinary() || analyzer.isUdfReduceFunction()) {
+ analyzer.setState(UdfAnalyzer.STATE_CAPTURE_INPUT2);
+ } else if (analyzer.hasUdfCollector()) {
+ analyzer.setState(UdfAnalyzer.STATE_CAPTURE_COLLECTOR);
+ } else {
+ analyzer.setState(UdfAnalyzer.STATE_END_OF_CAPTURING);
+ }
+
+ // input is iterable
+ if (analyzer.hasUdfIterableInput()) {
+ tv = new TaggedValue(type, Tag.INPUT_1_ITERABLE);
+ }
+ else {
+ tv = analyzer.getInput1AsTaggedValue();
+ }
+ curArgIndex++;
+ return tv;
+ // tag input 2
+ case UdfAnalyzer.STATE_CAPTURE_INPUT2:
+ if (analyzer.hasUdfCollector()) {
+ analyzer.setState(UdfAnalyzer.STATE_CAPTURE_COLLECTOR);
+ } else {
+ analyzer.setState(UdfAnalyzer.STATE_END_OF_CAPTURING);
+ }
+
+ // input is iterable
+ if (analyzer.hasUdfIterableInput()) {
+ tv = new TaggedValue(type, Tag.INPUT_2_ITERABLE);
+ }
+ // special case: reduce function
+ else if (analyzer.isUdfReduceFunction()) {
+ tv = analyzer.getInput1AsTaggedValue();
+ }
+ else {
+ tv = analyzer.getInput2AsTaggedValue();
+ }
+ curArgIndex++;
+ return tv;
+ // tag collector
+ case UdfAnalyzer.STATE_CAPTURE_COLLECTOR:
+ analyzer.setState(UdfAnalyzer.STATE_END_OF_CAPTURING);
+ tv = new TaggedValue(type, Tag.COLLECTOR);
+ curArgIndex++;
+ return tv;
+ // if capturing has finished do not tag the arguments any more
+ // but copy values where necessary (for nested methods)
+ case UdfAnalyzer.STATE_END_OF_CAPTURING:
+ default:
+ // skip return type
+ if (curArgIndex < 0) {
+ curArgIndex++;
+ }
+ // return method's arguments
+ else if (argumentValues != null && curArgIndex < argumentValues.size()) {
+ return argumentValues.get(curArgIndex++);
+ }
+ return super.newValue(type);
+ }
+ }
+
+ @Override
+ public BasicValue newOperation(AbstractInsnNode insn) throws AnalyzerException {
+ switch (insn.getOpcode()) {
+ case ACONST_NULL:
+ return new TaggedValue(Type.getObjectType("null"), Tag.NULL);
+ case NEW:
+ analyzer.incrNewOperationCounters(topLevelMethod);
+ // make new objects a tagged value to have possibility to tag an
+ // input container later
+ return new TaggedValue(Type.getObjectType(((TypeInsnNode) insn).desc));
+ // tag "int"-like constants
+ case BIPUSH:
+ case SIPUSH:
+ final IntInsnNode intInsn = (IntInsnNode) insn;
+ return new TaggedValue(intInsn.operand);
+ case LDC:
+ final Object cst = ((LdcInsnNode) insn).cst;
+ if (cst instanceof Integer) {
+ return new TaggedValue((Integer) cst);
+ }
+ return super.newOperation(insn);
+ case ICONST_M1:
+ return new TaggedValue(-1);
+ case ICONST_0:
+ return new TaggedValue(0);
+ case ICONST_1:
+ return new TaggedValue(1);
+ case ICONST_2:
+ return new TaggedValue(2);
+ case ICONST_3:
+ return new TaggedValue(3);
+ case ICONST_4:
+ return new TaggedValue(4);
+ case ICONST_5:
+ return new TaggedValue(5);
+ default:
+ return super.newOperation(insn);
+ }
+ }
+
+ @Override
+ public BasicValue copyOperation(AbstractInsnNode insn, BasicValue value) throws AnalyzerException {
+ switch (insn.getOpcode()) {
+ case ILOAD:
+ // int constants are only supported if they are used in the subsequent operation
+ // otherwise we can not guarantee that it isn't modified elsewhere (e.g. do-while loop)
+ if (isTagged(value) && tagged(value).isIntConstant()) {
+ tagged(value).makeRegular();
+ }
+ return super.copyOperation(insn, value);
+ // if input is stored "call by value", it will be copied
+ case ISTORE:
+ case LSTORE:
+ case FSTORE:
+ case DSTORE:
+ case ASTORE:
+ case DUP:
+ case DUP_X1:
+ case DUP_X2:
+ case DUP2:
+ case DUP2_X1:
+ case DUP2_X2:
+ if (isTagged(value) && tagged(value).isInput() && tagged(value).isCallByValue()) {
+ return tagged(value).copy();
+ }
+ default:
+ return super.copyOperation(insn, value);
+ }
+ }
+
+ @Override
+ public BasicValue unaryOperation(AbstractInsnNode insn, BasicValue value)
+ throws AnalyzerException {
+ switch (insn.getOpcode()) {
+ // modify jump instructions if we can assume that the hasNext operation will always
+ // be true at the first call
+ case IFEQ:
+ if (isTagged(value) && tagged(value).isIteratorTrueAssumption()) {
+ modifiedAsmAnalyzer.requestIFEQLoopModification();
+ }
+ return super.unaryOperation(insn, value);
+ case IFNE:
+ if (isTagged(value) && tagged(value).isIteratorTrueAssumption()) {
+ modifiedAsmAnalyzer.requestIFNELoopModification();
+ }
+ return super.unaryOperation(insn, value);
+
+ case CHECKCAST:
+ return value;
+ case PUTSTATIC:
+ analyzer.handlePutStatic();
+ return super.unaryOperation(insn, value);
+ case GETFIELD:
+ final FieldInsnNode field = (FieldInsnNode) insn;
+ // skip untagged values
+ if (!isTagged(value)) {
+ return super.unaryOperation(insn, value);
+ }
+ final TaggedValue taggedValue = (TaggedValue) value;
+
+ // inputs are atomic, a GETFIELD results in undefined state
+ if (taggedValue.isInput()) {
+ return super.unaryOperation(insn, value);
+ }
+ // access of input container field
+ // or access of a KNOWN UDF instance variable
+ else if (taggedValue.canContainFields()
+ && taggedValue.containerContains(field.name)) {
+ final TaggedValue tv = taggedValue.getContainerMapping().get(field.name);
+ if (tv != null) {
+ return tv;
+ }
+ }
+ // access of a yet UNKNOWN UDF instance variable
+ else if (taggedValue.isThis()
+ && !taggedValue.containerContains(field.name)) {
+ final TaggedValue tv = new TaggedValue(Type.getType(field.desc));
+ taggedValue.addContainerMapping(field.name, tv, currentFrame);
+ return tv;
+ }
+ // access of a yet unknown container, mark it as a container
+ else if (taggedValue.isRegular()) {
+ taggedValue.setTag(Tag.CONTAINER);
+ final TaggedValue tv = new TaggedValue(Type.getType(field.desc));
+ taggedValue.addContainerMapping(field.name, tv, currentFrame);
+ return tv;
+ }
+ return super.unaryOperation(insn, value);
+ case IINC:
+ // modification of a local variable or input
+ if (isTagged(value) && (tagged(value).isIntConstant() || tagged(value).isInput())) {
+ tagged(value).makeRegular();
+ }
+ return super.unaryOperation(insn, value);
+ default:
+ return super.unaryOperation(insn, value);
+ }
+ }
+
+ @Override
+ public BasicValue ternaryOperation(AbstractInsnNode insn, BasicValue value1, BasicValue value2,
+ BasicValue value3) throws AnalyzerException {
+ // if array is an input, make it regular since the input is modified
+ if (isTagged(value1) && tagged(value1).isInput()) {
+ tagged(value1).makeRegular();
+ }
+ return super.ternaryOperation(insn, value1, value2, value3);
+ }
+
+ @Override
+ public BasicValue binaryOperation(AbstractInsnNode insn, BasicValue value1,
+ BasicValue value2) throws AnalyzerException {
+ switch (insn.getOpcode()) {
+ case PUTFIELD: // put value2 into value1
+ // skip untagged values
+ if (!isTagged(value1)) {
+ return null;
+ }
+
+ final TaggedValue taggedValue = (TaggedValue) value1;
+ final FieldInsnNode field = (FieldInsnNode) insn;
+ final boolean value2HasInputDependency = hasImportantDependencies(value2);
+
+ // if value1 is not an input, make value1 a container and add value2 to it
+ // PUTFIELD on inputs is not allowed
+ if (!taggedValue.isInput() && value2HasInputDependency) {
+ if (!taggedValue.canContainFields()) {
+ taggedValue.setTag(Tag.CONTAINER);
+ }
+ taggedValue.addContainerMapping(field.name, tagged(value2), currentFrame);
+ }
+ // if value1 is filled with non-input, make it container and mark the field
+ // PUTFIELD on inputs is not allowed
+ else if (!taggedValue.isInput() && !value2HasInputDependency) {
+ if (!taggedValue.canContainFields()) {
+ taggedValue.setTag(Tag.CONTAINER);
+ }
+ taggedValue.addContainerMapping(field.name, null, currentFrame);
+ }
+ // PUTFIELD on input leads to input modification
+ // make input regular
+ else if (taggedValue.isInput()) {
+ taggedValue.makeRegular();
+ }
+ return null;
+ default:
+ return super.binaryOperation(insn, value1, value2);
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public BasicValue naryOperation(AbstractInsnNode insn, List rawValues) throws AnalyzerException {
+ final List<BasicValue> values = (List<BasicValue>) rawValues;
+ boolean isStatic = false;
+ switch (insn.getOpcode()) {
+ case INVOKESTATIC:
+ isStatic = true;
+ case INVOKESPECIAL:
+ case INVOKEVIRTUAL:
+ case INVOKEINTERFACE:
+ final MethodInsnNode method = (MethodInsnNode) insn;
+ String methodOwner = method.owner;
+
+ // special case: in case that class is extending tuple we need to find the class
+ // that contains the actual implementation to determine the tuple size
+ if (method.name.equals("getField") || method.name.equals("setField")) {
+ try {
+ final String newMethodOwner = (String) findMethodNode(methodOwner, method.name, method.desc)[1];
+ if (newMethodOwner.startsWith("org/apache/flink/api/java/tuple/Tuple")) {
+ methodOwner = newMethodOwner;
+ }
+ }
+ catch (IllegalStateException e) {
+ // proceed with the known method owner
+ }
+ }
+
+ // special case: collect method of Collector
+ if (method.name.equals("collect")
+ && methodOwner.equals("org/apache/flink/util/Collector")
+ && isTagged(values.get(0))
+ && tagged(values.get(0)).isCollector()) {
+ // check for invalid return value
+ if (isTagged(values.get(1)) && tagged(values.get(1)).isNull()) {
+ analyzer.handleNullReturn();
+ }
+ // valid return value with input dependencies
+ else if (hasImportantDependencies(values.get(1))){
+ // add a copy and a reference
+ // to capture the current state and future changes in alternative paths
+ analyzer.getCollectorValues().add(tagged(values.get(1)));
+ analyzer.getCollectorValues().add(tagged(values.get(1)).copy());
+ }
+ // valid return value without input dependencies
+ else {
+ analyzer.getCollectorValues().add(null);
+ }
+ }
+ // special case: iterator method of Iterable
+ else if (method.name.equals("iterator")
+ && methodOwner.equals("java/lang/Iterable")
+ && isTagged(values.get(0))
+ && tagged(values.get(0)).isInputIterable()) {
+ return new TaggedValue(Type.getObjectType("java/util/Iterator"),
+ (tagged(values.get(0)).isInput1Iterable()) ? Tag.INPUT_1_ITERATOR : Tag.INPUT_2_ITERATOR);
+ }
+ // special case: hasNext method of Iterator
+ else if (method.name.equals("hasNext")
+ && methodOwner.equals("java/util/Iterator")
+ && isTagged(values.get(0))
+ && tagged(values.get(0)).isInputIterator()
+ && !analyzer.isUdfBinary()
+ && !analyzer.isIteratorTrueAssumptionApplied()) {
+ return new TaggedValue(Type.BOOLEAN_TYPE, Tag.ITERATOR_TRUE_ASSUMPTION);
+ }
+ // special case: next method of Iterator
+ else if (method.name.equals("next")
+ && methodOwner.equals("java/util/Iterator")
+ && isTagged(values.get(0))
+ && tagged(values.get(0)).isInputIterator()) {
+ // after this call it is not possible to assume "TRUE" of "hasNext()" again
+ analyzer.applyIteratorTrueAssumption();
+ if (tagged(values.get(0)).isInput1Iterator()) {
+ return analyzer.getInput1AsTaggedValue();
+ }
+ else {
+ return analyzer.getInput2AsTaggedValue();
+ }
+ }
+ // if the UDF class contains instance variables that contain input,
+ // we need to analyze also methods without input-dependent arguments
+ // special case: do not follow the getRuntimeContext method of RichFunctions
+ else if (!isStatic
+ && isTagged(values.get(0))
+ && tagged(values.get(0)).isThis()
+ && hasImportantDependencies(values.get(0))
+ && !isGetRuntimeContext(method)) {
+ TaggedValue tv = invokeNestedMethod(values, method);
+ if (tv != null) {
+ return tv;
+ }
+ }
+ // the arguments have input dependencies ("THIS" does not count to the arguments)
+ // we can assume that method has at least one argument
+ else if ((!isStatic && isTagged(values.get(0)) && tagged(values.get(0)).isThis() && hasImportantDependencies(values, true))
+ || (!isStatic && (!isTagged(values.get(0)) || !tagged(values.get(0)).isThis()) && hasImportantDependencies(values, false))
+ || (isStatic && hasImportantDependencies(values, false))) {
+ // special case: Java unboxing/boxing methods on input
+ Type newType;
+ if (isTagged(values.get(0))
+ && tagged(values.get(0)).isInput()
+ && (!isStatic && (newType = checkForUnboxing(method.name, methodOwner)) != null
+ || (isStatic && (newType = checkForBoxing(method.name, method.desc, methodOwner))
+ != null))) {
+ return tagged(values.get(0)).copy(newType);
+ }
+ // special case: setField method of TupleXX
+ else if (method.name.equals("setField")
+ && methodOwner.startsWith("org/apache/flink/api/java/tuple/Tuple")
+ && isTagged(values.get(0))
+ ) {
+ final TaggedValue tuple =tagged(values.get(0));
+ tuple.setTag(Tag.CONTAINER);
+
+ // check if fieldPos is constant
+ // if not, we can not determine a state for the tuple
+ if (!isTagged(values.get(2)) || !tagged(values.get(2)).isIntConstant()) {
+ tuple.makeRegular();
+ }
+ else {
+ final int constant = tagged(values.get(2)).getIntConstant();
+
+ if (constant < 0 || Integer.valueOf(methodOwner.split("Tuple")[1]) <= constant ) {
+ analyzer.handleInvalidTupleAccess();
+ }
+
+ // if it is at least tagged, add it anyways
+ if (isTagged(values.get(1))) {
+ tuple.addContainerMapping("f" + constant, tagged(values.get(1)), currentFrame);
+ }
+ // mark the field as it has an undefined state
+ else {
+ tuple.addContainerMapping("f" + constant, null, currentFrame);
+ }
+ }
+ }
+ // special case: getField method of TupleXX
+ else if (method.name.equals("getField")
+ && methodOwner.startsWith("org/apache/flink/api/java/tuple/Tuple")) {
+ final TaggedValue tuple = tagged(values.get(0)); // we can assume that 0 is an input dependent tuple
+ // constant field index
+ if (isTagged(values.get(1)) // constant
+ && tagged(values.get(1)).isIntConstant()) {
+ final int constant = tagged(values.get(1)).getIntConstant();
+
+ if (constant < 0 || Integer.valueOf(methodOwner.split("Tuple")[1]) <= constant) {
+ analyzer.handleInvalidTupleAccess();
+ }
+
+ if (tuple.containerContains("f" + constant)) {
+ final TaggedValue tupleField = tuple.getContainerMapping().get("f" + constant);
+ if (tupleField != null) {
+ return tupleField;
+ }
+ }
+ }
+ // unknown field index
+ else {
+ // we need to make the tuple regular as we cannot track modifications of fields
+ tuple.makeRegular();
+ return new TaggedValue(Type.getObjectType("java/lang/Object"));
+ }
+ }
+ // nested method invocation
+ else {
+ TaggedValue tv = invokeNestedMethod(values, method);
+ if (tv != null) {
+ return tv;
+ }
+ }
+ }
+ return super.naryOperation(insn, values);
+ default:
+ // TODO support for INVOKEDYNAMIC instructions
+ return super.naryOperation(insn, values);
+ }
+ }
+
+ @Override
+ public void returnOperation(AbstractInsnNode insn, BasicValue value,
+ BasicValue expected) throws AnalyzerException {
+ // return of a null value in the top level UDF method
+ if (isTagged(value) && tagged(value).isNull() && topLevelMethod) {
+ analyzer.handleNullReturn();
+ }
+ else if (hasImportantDependencies(value)){
+ // add a copy and a reference
+ // to capture the current state and future changes in alternative paths
+ returnValues.add(tagged(value));
+ returnValues.add(tagged(value).copy());
+ }
+ else {
+ returnValues.add(null);
+ }
+ }
+
+ @Override
+ public BasicValue merge(BasicValue v, BasicValue w) {
+ // values are not equal
+ // BasicValue's equals method is too general
+ if ((!isTagged(v) && !w.equals(v)) || (isTagged(v) && !v.equals(w))) {
+ // w is a BasicValue
+ if (isTagged(v) && !isTagged(w)) {
+ return new TaggedValue(w.getType());
+ }
+ // v is a BasicValue or uninteresting and w is a interesting TaggedValue
+ else if ((!isTagged(v) || tagged(v).canNotContainInput())
+ && isTagged(w) && tagged(w).canContainInput()) {
+ final TaggedValue taggedW = tagged(w);
+ if (hasImportantDependencies(taggedW)
+ && rightMergePriority) {
+ // w has a merge priority, its grouped inputs will be returned
+ final TaggedValue returnValue = removeUngroupedInputs(taggedW.copy());
+ if (returnValue != null) {
+ return returnValue;
+ }
+ else {
+ return new TaggedValue(v.getType());
+ }
+ }
+ return new TaggedValue(v.getType());
+ }
+ // v is a BasicValue and w is a uninteresting TaggedValue
+ else if (!isTagged(v) && isTagged(w) && tagged(w).canNotContainInput()) {
+ return v;
+ }
+ // merge v and w (both TaggedValues), v is interesting
+ else if (isTagged(v) && isTagged(w) && tagged(v).canContainInput()) {
+ final List<TaggedValue> list = Arrays.asList(tagged(v), tagged(w));
+ final TaggedValue returnValue = mergeReturnValues(list);
+ if (returnValue != null) {
+ return returnValue;
+ }
+ }
+ // v is a TaggedValue and uninteresting
+ else if (isTagged(v) && tagged(v).canNotContainInput()) {
+ return v;
+ }
+ // v and w are BasicValues and not equal
+ return BasicValue.UNINITIALIZED_VALUE;
+ }
+ // v and w are equal, return one of them
+ return v;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
new file mode 100644
index 0000000..c54ecf3
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.objectweb.asm.Type;
+import org.objectweb.asm.tree.analysis.BasicValue;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Extension of ASM's BasicValue that allows to assign "tags"
+ * to values and add additional information depending on the tag to the Value.
+ */
+public class TaggedValue extends BasicValue {
+
+ public static enum Tag {
+ REGULAR, // regular object with no special meaning
+ THIS, // a special container which is the instance of the UDF
+ INPUT, // atomic input field
+ COLLECTOR, // collector of UDF
+ CONTAINER, // container that contains fields
+ INT_CONSTANT, // int constant
+ INPUT_1_ITERABLE, INPUT_2_ITERABLE, INPUT_1_ITERATOR, INPUT_2_ITERATOR, // input iterators
+ ITERATOR_TRUE_ASSUMPTION, // boolean value that is "true" at least once
+ NULL // null
+ };
+
+ public static enum Input {
+ INPUT_1(0), INPUT_2(1);
+
+ private int id;
+
+ private Input(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+ };
+
+ private Tag tag;
+ // only inputs can set this to true
+ private boolean callByValue = false;
+
+ // for inputs
+ private Input input;
+ private String flatFieldExpr; // empty string for non-composite types
+ private boolean grouped;
+
+ // for input containers & this
+ // key=field / value=input or container
+ // undefined state => value=null
+ private Map<String, TaggedValue> containerMapping;
+ private Map<String, ModifiedASMFrame> containerFrameMapping;
+
+ // for int constants
+ private int intConstant;
+
+ public TaggedValue(Type type) {
+ this(type, Tag.REGULAR);
+ }
+
+ public TaggedValue(Type type, Tag tag) {
+ super(type);
+ this.tag = tag;
+ }
+
+ public TaggedValue(Type type, Input input, String flatFieldExpr, boolean grouped, boolean callByValue) {
+ super(type);
+ tag = Tag.INPUT;
+ this.input = input;
+ this.flatFieldExpr = flatFieldExpr;
+ this.grouped = grouped;
+ this.callByValue = callByValue;
+ }
+
+ public TaggedValue(Type type, Map<String, TaggedValue> containerMapping) {
+ super(type);
+ tag = Tag.CONTAINER;
+ this.containerMapping = containerMapping;
+ }
+
+ public TaggedValue(int constant) {
+ super(Type.INT_TYPE);
+ tag = Tag.INT_CONSTANT;
+ this.intConstant = constant;
+ }
+
+ public boolean isInput() {
+ return tag == Tag.INPUT;
+ }
+
+ public boolean isThis() {
+ return tag == Tag.THIS;
+ }
+
+ public boolean isContainer() {
+ return tag == Tag.CONTAINER;
+ }
+
+ public boolean isRegular() {
+ return tag == Tag.REGULAR;
+ }
+
+ public boolean isIntConstant() {
+ return tag == Tag.INT_CONSTANT;
+ }
+
+ public boolean isCollector() {
+ return tag == Tag.COLLECTOR;
+ }
+
+ public boolean isInputIterable() {
+ return tag == Tag.INPUT_1_ITERABLE || tag == Tag.INPUT_2_ITERABLE;
+ }
+
+ public boolean isInputIterator() {
+ return tag == Tag.INPUT_1_ITERATOR || tag == Tag.INPUT_2_ITERATOR;
+ }
+
+ public boolean isInput1Iterable() {
+ return tag == Tag.INPUT_1_ITERABLE;
+ }
+
+ public boolean isInput1Iterator() {
+ return tag == Tag.INPUT_1_ITERATOR;
+ }
+
+ public boolean isIteratorTrueAssumption() {
+ return tag == Tag.ITERATOR_TRUE_ASSUMPTION;
+ }
+
+ public boolean isNull() {
+ return tag == Tag.NULL;
+ }
+
+ public boolean canNotContainInput() {
+ return tag != Tag.INPUT && tag != Tag.CONTAINER && tag != Tag.THIS;
+ }
+
+ public boolean canContainInput() {
+ return tag == Tag.INPUT || tag == Tag.CONTAINER || tag == Tag.THIS;
+ }
+
+ public boolean canContainFields() {
+ return tag == Tag.CONTAINER || tag == Tag.THIS;
+ }
+
+ public boolean isCallByValue() {
+ return callByValue;
+ }
+
+ public Tag getTag() {
+ return tag;
+ }
+
+ public void setTag(Tag tag) {
+ this.tag = tag;
+ if (tag == Tag.CONTAINER || tag == Tag.THIS) {
+ input = null;
+ flatFieldExpr = null;
+ }
+ else if (tag == Tag.INPUT) {
+ containerMapping = null;
+ }
+ else {
+ input = null;
+ containerMapping = null;
+ flatFieldExpr = null;
+ }
+ }
+
+ public String toForwardedFieldsExpression(Input input) {
+ // input not relevant
+ if (isInput() && this.input != input) {
+ return null;
+ }
+ // equivalent to semantic annotation "*" for non-composite types
+ else if (isInput() && flatFieldExpr.length() == 0) {
+ return "*";
+ }
+ // equivalent to "f0.f0->*"
+ else if (isInput()) {
+ return flatFieldExpr + "->*";
+ }
+ // equivalent to "f3;f0.f0->f0.f1;f1->f2;..."
+ else if (canContainFields() && containerMapping != null) {
+ final StringBuilder sb = new StringBuilder();
+ traverseContainer(input, containerMapping, sb, "");
+ final String returnValue = sb.toString();
+ if (returnValue != null && returnValue.length() > 0) {
+ return returnValue;
+ }
+ }
+ return null;
+ }
+
+ private void traverseContainer(Input input, Map<String, TaggedValue> containerMapping, StringBuilder sb,
+ String prefix) {
+ for (Map.Entry<String,TaggedValue> entry : containerMapping.entrySet()) {
+ // skip undefined states
+ if (entry.getValue() == null) {
+ continue;
+ }
+ // input
+ else if (entry.getValue().isInput() && entry.getValue().input == input) {
+ final String flatFieldExpr = entry.getValue().getFlatFieldExpr();
+ if (flatFieldExpr.length() == 0) {
+ sb.append("*");
+ }
+ else {
+ sb.append(flatFieldExpr);
+ }
+ sb.append("->");
+ if (prefix.length() > 0) {
+ sb.append(prefix);
+ sb.append('.');
+ }
+ sb.append(entry.getKey());
+ sb.append(';');
+ }
+ // input containers
+ else if (entry.getValue().canContainFields()) {
+ traverseContainer(input, entry.getValue().containerMapping, sb,
+ ((prefix.length() > 0)? prefix + "." : "") + entry.getKey());
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object value) {
+ if (!(value instanceof TaggedValue) || !super.equals(value)) {
+ return false;
+ }
+ final TaggedValue other = (TaggedValue) value;
+ if (other.tag != tag) {
+ return false;
+ }
+
+ if (isInput()) {
+ return input == other.input && flatFieldExpr.equals(other.flatFieldExpr)
+ && grouped == other.grouped && callByValue == other.callByValue;
+ }
+ else if (canContainFields()) {
+ if ((containerMapping == null && other.containerMapping != null)
+ || (containerMapping != null && other.containerMapping == null)) {
+ return false;
+ }
+ if (containerMapping == null) {
+ return true;
+ }
+ return containerMapping.equals(other.containerMapping);
+ }
+ return tag == other.tag;
+ }
+
+ @Override
+ public String toString() {
+ if (isInput()) {
+ return "TaggedValue(" + tag + ":" + flatFieldExpr + ")";
+ }
+ else if (canContainFields()) {
+ return "TaggedValue(" + tag + ":" + containerMapping + ")";
+ }
+ else if (isIntConstant()) {
+ return "TaggedValue(" + tag + ":" + intConstant + ")";
+ }
+ return "TaggedValue(" + tag + ")";
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Input
+ // --------------------------------------------------------------------------------------------
+
+ public Input getInput() {
+ return input;
+ }
+
+ public String getFlatFieldExpr() {
+ return flatFieldExpr;
+ }
+
+ public boolean isGrouped() {
+ return grouped;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Container & This
+ // --------------------------------------------------------------------------------------------
+
+ public Map<String, TaggedValue> getContainerMapping() {
+ return containerMapping;
+ }
+
+ public boolean containerContains(String field) {
+ if (containerMapping == null) {
+ return false;
+ }
+ return containerMapping.containsKey(field);
+ }
+
+ public boolean containerHasReferences() {
+ if (containerMapping == null) {
+ return false;
+ }
+ for (TaggedValue value : containerMapping.values()) {
+ if (value == null || !value.isCallByValue()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void addContainerMapping(String field, TaggedValue mapping, ModifiedASMFrame frame) {
+ if (containerMapping == null) {
+ containerMapping = new HashMap<String, TaggedValue>(4);
+ }
+ if (containerFrameMapping == null) {
+ containerFrameMapping = new HashMap<String, ModifiedASMFrame>(4);
+ }
+ if (containerMapping.containsKey(field)
+ && containerMapping.get(field) != null
+ && frame == containerFrameMapping.get(field)) {
+ containerMapping.put(field, null);
+ containerFrameMapping.remove(field);
+ }
+ else {
+ containerMapping.put(field, mapping);
+ containerFrameMapping.put(field, frame);
+ }
+ }
+
+ public void clearContainerMappingMarkedFields() {
+ if (containerMapping != null) {
+ final Iterator<Entry<String, TaggedValue>> it = containerMapping.entrySet().iterator();
+ while (it.hasNext()) {
+ final Entry<String, TaggedValue> entry = it.next();
+ if (entry.getValue() == null) {
+ it.remove();
+ }
+ }
+ }
+ }
+
+ public void makeRegular() {
+ if (canContainFields() && containerMapping != null) {
+ for (TaggedValue value : containerMapping.values()) {
+ value.makeRegular();
+ }
+ }
+ setTag(Tag.REGULAR);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // IntConstant
+ // --------------------------------------------------------------------------------------------
+
+ public int getIntConstant() {
+ return intConstant;
+ }
+
+ public TaggedValue copy() {
+ return copy(getType());
+ }
+
+ public TaggedValue copy(Type type) {
+ final TaggedValue newValue = new TaggedValue(type);
+ newValue.tag = this.tag;
+ if (isInput()) {
+ newValue.input = this.input;
+ newValue.flatFieldExpr = this.flatFieldExpr;
+ newValue.grouped = this.grouped;
+ newValue.callByValue = this.callByValue;
+ }
+ else if (canContainFields()) {
+ final HashMap<String, TaggedValue> containerMapping = new HashMap<String, TaggedValue>(this.containerMapping.size());
+ final HashMap<String, ModifiedASMFrame> containerFrameMapping;
+ if (this.containerFrameMapping != null) {
+ containerFrameMapping = new HashMap<String, ModifiedASMFrame>(this.containerFrameMapping.size());
+ } else {
+ containerFrameMapping = null;
+ }
+ for (Entry<String, TaggedValue> entry : this.containerMapping.entrySet()) {
+ if (entry.getValue() != null) {
+ containerMapping.put(entry.getKey(), entry.getValue().copy());
+ if (containerFrameMapping != null) {
+ containerFrameMapping.put(entry.getKey(), this.containerFrameMapping.get(entry.getKey()));
+ }
+ } else {
+ containerMapping.put(entry.getKey(), null);
+ }
+ }
+ newValue.containerMapping = containerMapping;
+ newValue.containerFrameMapping = containerFrameMapping;
+ }
+ else if (isIntConstant()) {
+ newValue.intConstant = this.intConstant;
+ }
+ return newValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
new file mode 100644
index 0000000..1f4b44a
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.java.sca.TaggedValue.Input;
+import org.objectweb.asm.Type;
+import org.objectweb.asm.tree.MethodNode;
+import org.slf4j.Logger;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.convertTypeInfoToTaggedValue;
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.findMethodNode;
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.mergeReturnValues;
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.removeUngroupedInputsFromContainer;
+
+/**
+ * Implements a Static Code Analyzer (SCA) that uses the ASM framework
+ * for interpreting Java bytecode of Flink UDFs. The analyzer is build on
+ * top of ASM's BasicInterpreter. Instead of ASM's BasicValues, it introduces
+ * TaggedValues which extend BasicValue and allows for appending interesting
+ * information to values. Interesting values such as inputs, collectors, or
+ * constants are tagged such that a tracking of atomic input fields through the
+ * entire UDF (until the function returns or calls collect()) is possible.
+ *
+ * The implementation is as conservative as possible meaning that for cases
+ * or bytecode instructions that haven't been considered the analyzer
+ * will fallback to the ASM library (which removes TaggedValues).
+ */
+public class UdfAnalyzer {
+ // necessary to prevent endless loops and stack overflows
+ private static final int MAX_NESTING = 20;
+
+ // general information about the UDF that is available before analysis takes place
+ private final Method baseClassMethod;
+ private final boolean hasCollector;
+ private final boolean isBinary;
+ private final boolean isIterableInput;
+ private final boolean isReduceFunction;
+ private final boolean isFilterFunction;
+ private final Class<?> udfClass;
+ private final String externalUdfName;
+ private final String internalUdfClassName;
+ private final TypeInformation<?> in1Type;
+ private final TypeInformation<?> in2Type;
+ private final TypeInformation<?> outType;
+ private final Keys<?> keys1;
+ private final Keys<?> keys2;
+
+ // flag if code errors should throw an CodeErrorException
+ private final boolean throwErrorExceptions;
+
+ // list of all values added with a "collect()" call
+ private final List<TaggedValue> collectorValues;
+
+ // list of all hints that can be returned after analysis
+ private final List<String> hints;
+ private boolean warning = false;
+
+ // stages for capturing and tagging the initial BasicValues
+ private int state = STATE_CAPTURE_RETURN;
+
+ static final int STATE_CAPTURE_RETURN = 0;
+ static final int STATE_CAPTURE_THIS = 1;
+ static final int STATE_CAPTURE_INPUT1 = 2;
+ static final int STATE_CAPTURE_INPUT2 = 3;
+ static final int STATE_CAPTURE_COLLECTOR = 4;
+ static final int STATE_END_OF_CAPTURING = 5;
+ static final int STATE_END_OF_ANALYZING = 6;
+
+ // flag that indicates if the "hasNext()" call of an input iterator has returned "TRUE"
+ // and can now return "FALSE" if assumption has already been used
+ private boolean iteratorTrueAssumptionApplied;
+
+ // merged return value of all return statements in the UDF
+ private TaggedValue returnValue;
+
+ // statistics for object creation hinting
+ private int newOperationCounterOverall;
+ private int newOperationCounterTopLevel;
+
+ // stored FilterFunction input for later modification checking
+ private TaggedValue filterInputCopy;
+ private TaggedValue filterInputRef;
+
+ public UdfAnalyzer(Class<?> baseClass, Class<?> udfClass, String externalUdfName,
+ TypeInformation<?> in1Type, TypeInformation<?> in2Type,
+ TypeInformation<?> outType, Keys<?> keys1, Keys<?> keys2,
+ boolean throwErrorExceptions) {
+
+ baseClassMethod = baseClass.getDeclaredMethods()[0];
+ this.udfClass = udfClass;
+ this.externalUdfName = externalUdfName;
+ this.internalUdfClassName = Type.getInternalName(udfClass);
+ this.in1Type = in1Type;
+ this.in2Type = in2Type;
+ this.outType = outType;
+ this.keys1 = keys1;
+ this.keys2 = keys2;
+ this.throwErrorExceptions = throwErrorExceptions;
+
+ if (baseClass == CoGroupFunction.class) {
+ hasCollector = true;
+ isBinary = true;
+ isIterableInput = true;
+ isReduceFunction = false;
+ isFilterFunction = false;
+ iteratorTrueAssumptionApplied = true;
+ }
+ else if (baseClass == CrossFunction.class) {
+ hasCollector = false;
+ isBinary = true;
+ isIterableInput = false;
+ isReduceFunction = false;
+ isFilterFunction = false;
+ iteratorTrueAssumptionApplied = true;
+ }
+ else if (baseClass == FlatJoinFunction.class) {
+ hasCollector = true;
+ isBinary = true;
+ isIterableInput = false;
+ isReduceFunction = false;
+ isFilterFunction = false;
+ iteratorTrueAssumptionApplied = true;
+ }
+ else if (baseClass == FlatMapFunction.class) {
+ hasCollector = true;
+ isBinary = false;
+ isIterableInput = false;
+ isReduceFunction = false;
+ isFilterFunction = false;
+ iteratorTrueAssumptionApplied = true;
+ }
+ else if (baseClass == GroupReduceFunction.class) {
+ hasCollector = true;
+ isBinary = false;
+ isIterableInput = true;
+ isReduceFunction = false;
+ isFilterFunction = false;
+ iteratorTrueAssumptionApplied = false;
+ }
+ else if (baseClass == JoinFunction.class) {
+ hasCollector = false;
+ isBinary = true;
+ isIterableInput = false;
+ isReduceFunction = false;
+ isFilterFunction = false;
+ iteratorTrueAssumptionApplied = true;
+ }
+ else if (baseClass == MapFunction.class) {
+ hasCollector = false;
+ isBinary = false;
+ isIterableInput = false;
+ isReduceFunction = false;
+ isFilterFunction = false;
+ iteratorTrueAssumptionApplied = true;
+ }
+ else if (baseClass == ReduceFunction.class) {
+ hasCollector = false;
+ isBinary = false;
+ isIterableInput = false;
+ isReduceFunction = true;
+ isFilterFunction = false;
+ iteratorTrueAssumptionApplied = true;
+ }
+ else if (baseClass == FilterFunction.class) {
+ hasCollector = false;
+ isBinary = false;
+ isIterableInput = false;
+ isReduceFunction = false;
+ isFilterFunction = true;
+ iteratorTrueAssumptionApplied = true;
+ }
+ // TODO MapPartitionFunction, GroupCombineFunction and CombineFunction not implemented yet
+ else {
+ throw new UnsupportedOperationException("Unsupported operator.");
+ }
+ if (hasCollector) {
+ collectorValues = new ArrayList<TaggedValue>();
+ }
+ else {
+ collectorValues = null;
+ }
+ hints = new ArrayList<String>();
+ }
+
+ public int getState() {
+ return state;
+ }
+
+ public void setState(int state) {
+ this.state = state;
+ }
+
+ public boolean isUdfBinary() {
+ return isBinary;
+ }
+
+ public boolean isIteratorTrueAssumptionApplied() {
+ return iteratorTrueAssumptionApplied;
+ }
+
+ public void applyIteratorTrueAssumption() {
+ iteratorTrueAssumptionApplied = true;
+ }
+
+ public void incrNewOperationCounters(boolean topLevel) {
+ newOperationCounterOverall++;
+ if (topLevel) {
+ newOperationCounterTopLevel++;
+ }
+ }
+
+ public boolean hasUdfCollector() {
+ return hasCollector;
+ }
+
+ public boolean hasUdfIterableInput() {
+ return isIterableInput;
+ }
+
+ public boolean isUdfReduceFunction() {
+ return isReduceFunction;
+ }
+
+ public String getInternalUdfClassName() {
+ return internalUdfClassName;
+ }
+
+ public List<TaggedValue> getCollectorValues() {
+ return collectorValues;
+ }
+
+ public boolean analyze() throws CodeAnalyzerException {
+ if (state == STATE_END_OF_ANALYZING) {
+ throw new IllegalStateException("Analyzing is already done.");
+ }
+
+ boolean discardReturnValues = false;
+
+ if (isIterableInput) {
+ if (keys1 == null || (keys2 == null && isBinary)) {
+ throw new IllegalArgumentException("This type of function requires key information for analysis.");
+ }
+ else if (!(keys1 instanceof ExpressionKeys) || (!(keys2 instanceof ExpressionKeys) && isBinary)) {
+ // TODO currently only ExpressionKeys are supported as keys
+ discardReturnValues = true;
+ }
+ }
+
+ try {
+ final Object[] mn = findMethodNode(internalUdfClassName, baseClassMethod);
+ final NestedMethodAnalyzer nma = new NestedMethodAnalyzer(this, (String) mn[1],
+ (MethodNode) mn[0], null, MAX_NESTING, true);
+ final TaggedValue result = nma.analyze();
+ setState(STATE_END_OF_ANALYZING);
+
+ // special case: FilterFunction
+ if (isFilterFunction) {
+ discardReturnValues = true;
+ // check for input modification
+ if (!filterInputCopy.equals(filterInputRef)) {
+ addHintOrThrowException("Function modifies the input. This can lead to unexpected behaviour during runtime.");
+ }
+ }
+
+ if (!discardReturnValues) {
+ // merge return values of a collector
+ if (hasCollector) {
+ returnValue = mergeReturnValues(collectorValues);
+ }
+ else {
+ returnValue = result;
+ }
+ // remove ungrouped inputs from result if UDF has iterators
+ // or is a reduce function
+ if ((isIterableInput || isReduceFunction) && returnValue != null) {
+ if (returnValue.canContainFields()) {
+ removeUngroupedInputsFromContainer(returnValue);
+ }
+ else if (returnValue.isInput() && !returnValue.isGrouped()) {
+ returnValue = null;
+ }
+ }
+ }
+ // any return value is invalid
+ else {
+ returnValue = null;
+ }
+ }
+ catch (Exception e) {
+ Throwable cause = e.getCause();
+ while (cause != null && !(cause instanceof CodeErrorException)) {
+ cause = cause.getCause();
+ }
+ if ((cause != null && cause instanceof CodeErrorException) || e instanceof CodeErrorException) {
+ throw new CodeErrorException("Function code contains obvious errors. " +
+ "If you think the code analysis is wrong at this point you can " +
+ "disable the entire code analyzer in ExecutionConfig or add" +
+ " @SkipCodeAnalysis to your function to disable the analysis.",
+ (cause != null)? cause : e);
+ }
+ throw new CodeAnalyzerException("Exception occurred during code analysis.", e);
+ }
+ return true;
+ }
+
+ public SemanticProperties getSemanticProperties() {
+ final SemanticProperties sp;
+ if (isBinary) {
+ sp = new DualInputSemanticProperties();
+ if (returnValue != null) {
+ String[] ff1Array = null;
+ final String ff1 = returnValue.toForwardedFieldsExpression(Input.INPUT_1);
+ if (ff1 !=null && ff1.length() > 0) {
+ ff1Array = new String[] { ff1 };
+ }
+ String[] ff2Array = null;
+ final String ff2 = returnValue.toForwardedFieldsExpression(Input.INPUT_2);
+ if (ff2 !=null && ff2.length() > 0) {
+ ff2Array = new String[] { ff2 };
+ }
+ SemanticPropUtil.getSemanticPropsDualFromString((DualInputSemanticProperties) sp,
+ ff1Array, ff2Array, null, null, null, null, in1Type, in2Type, outType, true);
+ }
+ }
+ else {
+ sp = new SingleInputSemanticProperties();
+ if (returnValue != null) {
+ String[] ffArray = null;
+ final String ff = returnValue.toForwardedFieldsExpression(Input.INPUT_1);
+ if (ff !=null && ff.length() > 0) {
+ ffArray = new String[] { ff };
+ }
+ SemanticPropUtil.getSemanticPropsSingleFromString((SingleInputSemanticProperties) sp,
+ ffArray, null, null, in1Type, outType, true);
+ }
+ }
+ return sp;
+ }
+
+ public void addSemanticPropertiesHints() {
+ boolean added = false;
+ if (returnValue != null) {
+ if (isBinary) {
+ final String ff1 = returnValue.toForwardedFieldsExpression(Input.INPUT_1);
+ if (ff1 != null && ff1.length() > 0) {
+ added = true;
+ hints.add("Possible annotation: "
+ + "@ForwardedFieldsFirst(\"" + ff1 + "\")");
+ }
+ final String ff2 = returnValue.toForwardedFieldsExpression(Input.INPUT_2);
+ if (ff2 != null && ff2.length() > 0) {
+ added = true;
+ hints.add("Possible annotation: "
+ + "@ForwardedFieldsSecond(\"" + ff2 + "\")");
+ }
+ } else {
+ final String ff = returnValue.toForwardedFieldsExpression(Input.INPUT_1);
+ if (ff != null && ff.length() > 0) {
+ added = true;
+ hints.add("Possible annotation: "
+ + "@ForwardedFields(\"" + ff + "\")");
+ }
+ }
+ }
+ if (!added) {
+ hints.add("Possible annotations: none.");
+ }
+ }
+
+ public void printToLogger(Logger log) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Code analysis result for '" + externalUdfName + " (" + udfClass.getName() + ")':");
+ sb.append("\nNumber of object creations: "
+ + newOperationCounterTopLevel + " in method / " + newOperationCounterOverall + " transitively");
+
+ for (String hint : hints) {
+ sb.append('\n');
+ sb.append(hint);
+ }
+
+ if (warning) {
+ log.warn(sb.toString());
+ }
+ else {
+ log.info(sb.toString());
+ }
+ }
+
+ public TaggedValue getInput1AsTaggedValue() {
+ final int[] groupedKeys;
+ if (keys1 != null) {
+ groupedKeys = keys1.computeLogicalKeyPositions();
+ }
+ else {
+ groupedKeys = null;
+ }
+ final TaggedValue input1 = convertTypeInfoToTaggedValue(Input.INPUT_1, in1Type, "", null, groupedKeys);
+ // store the input and a copy of it to check for modification afterwards
+ if (isFilterFunction) {
+ filterInputRef = input1;
+ filterInputCopy = input1.copy();
+ }
+ return input1;
+ }
+
+ public TaggedValue getInput2AsTaggedValue() {
+ final int[] groupedKeys;
+ if (keys2 != null) {
+ groupedKeys = keys2.computeLogicalKeyPositions();
+ }
+ else {
+ groupedKeys = null;
+ }
+ return convertTypeInfoToTaggedValue(Input.INPUT_2, in2Type, "", null, groupedKeys);
+ }
+
+ private void addHintOrThrowException(String msg) {
+ if (throwErrorExceptions) {
+ throw new CodeErrorException(externalUdfName + ": " + msg);
+ }
+ else {
+ warning = true;
+ hints.add(msg);
+ }
+ }
+
+ public void handleNullReturn() {
+ addHintOrThrowException("Function returns 'null' values. This can lead to errors during runtime.");
+ }
+
+ public void handlePutStatic() {
+ addHintOrThrowException("Function modifies static fields. This can lead to unexpected behaviour during runtime.");
+ }
+
+ public void handleInvalidTupleAccess() {
+ addHintOrThrowException("Function contains tuple accesses with invalid indexes. This can lead to errors during runtime.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
new file mode 100644
index 0000000..df1e421
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.objectweb.asm.ClassReader;
+import org.objectweb.asm.Type;
+import org.objectweb.asm.tree.ClassNode;
+import org.objectweb.asm.tree.MethodNode;
+import org.objectweb.asm.tree.analysis.BasicValue;
+import org.objectweb.asm.tree.analysis.Value;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public final class UdfAnalyzerUtils {
+
+ public static TaggedValue convertTypeInfoToTaggedValue(TaggedValue.Input input, TypeInformation<?> typeInfo,
+ String flatFieldExpr, List<CompositeType.FlatFieldDescriptor> flatFieldDesc, int[] groupedKeys) {
+ // java tuples & scala tuples
+ if (typeInfo instanceof TupleTypeInfoBase) {
+ final TupleTypeInfoBase<?> tupleTypeInfo = (TupleTypeInfoBase<?>) typeInfo;
+ HashMap<String, TaggedValue> containerMapping = new HashMap<String, TaggedValue>();
+ for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
+ final String fieldName;
+ // java
+ if (typeInfo instanceof TupleTypeInfo) {
+ fieldName = "f" + i;
+ }
+ // scala
+ else {
+ fieldName = "_" + (i+1);
+ }
+ containerMapping.put(fieldName,
+ convertTypeInfoToTaggedValue(input,
+ tupleTypeInfo.getTypeAt(i),
+ (flatFieldExpr.length() > 0 ? flatFieldExpr + "." : "") + fieldName,
+ tupleTypeInfo.getFlatFields(fieldName),
+ groupedKeys));
+ }
+ return new TaggedValue(Type.getObjectType("java/lang/Object"), containerMapping);
+ }
+ // pojos
+ else if (typeInfo instanceof PojoTypeInfo) {
+ final PojoTypeInfo<?> pojoTypeInfo = (PojoTypeInfo<?>) typeInfo;
+ HashMap<String, TaggedValue> containerMapping = new HashMap<String, TaggedValue>();
+ for (int i = 0; i < pojoTypeInfo.getArity(); i++) {
+ final String fieldName = pojoTypeInfo.getPojoFieldAt(i).field.getName();
+ containerMapping.put(fieldName,
+ convertTypeInfoToTaggedValue(input,
+ pojoTypeInfo.getTypeAt(i),
+ (flatFieldExpr.length() > 0 ? flatFieldExpr + "." : "") + fieldName,
+ pojoTypeInfo.getFlatFields(fieldName),
+ groupedKeys));
+ }
+ return new TaggedValue(Type.getObjectType("java/lang/Object"), containerMapping);
+ }
+ // atomic
+ boolean groupedField = false;
+ if (groupedKeys != null && flatFieldDesc != null) {
+ int flatFieldPos = flatFieldDesc.get(0).getPosition();
+ for (int groupedKey : groupedKeys) {
+ if (groupedKey == flatFieldPos) {
+ groupedField = true;
+ break;
+ }
+ }
+ }
+
+ return new TaggedValue(Type.getType(typeInfo.getTypeClass()), input, flatFieldExpr, groupedField,
+ typeInfo.isBasicType() && typeInfo != BasicTypeInfo.DATE_TYPE_INFO);
+ }
+
+ /**
+ * @return array that contains the method node and the name of the class where
+ * the method node has been found
+ */
+ public static Object[] findMethodNode(String internalClassName, Method method) {
+ return findMethodNode(internalClassName, method.getName(), Type.getMethodDescriptor(method));
+ }
+
+ /**
+ * @return array that contains the method node and the name of the class where
+ * the method node has been found
+ */
+ @SuppressWarnings("unchecked")
+ public static Object[] findMethodNode(String internalClassName, String name, String desc) {
+ try {
+ // iterate through hierarchy and search for method node /
+ // class that really implements the method
+ while (internalClassName != null) {
+ ClassReader cr = new ClassReader(Thread.currentThread().getContextClassLoader()
+ .getResourceAsStream(internalClassName.replace('.', '/') + ".class"));
+ final ClassNode cn = new ClassNode();
+ cr.accept(cn, 0);
+ for (MethodNode mn : (List<MethodNode>) cn.methods) {
+ if (mn.name.equals(name) && mn.desc.equals(desc)) {
+ return new Object[]{ mn , cr.getClassName()};
+ }
+ }
+ internalClassName = cr.getSuperName();
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("Method '" + name + "' could not be found", e);
+ }
+ throw new IllegalStateException("Method '" + name + "' could not be found");
+ }
+
+ public static boolean isTagged(Value value) {
+ return value != null && value instanceof TaggedValue;
+ }
+
+ public static TaggedValue tagged(Value value) {
+ return (TaggedValue) value;
+ }
+
+ /**
+ *
+ * @return returns whether a value of the list of values is or contains
+ * important dependencies (inputs or collectors) that require special analysis
+ * (e.g. to dig into a nested method). The first argument can be skipped e.g.
+ * in order to skip the "this" of non-static method arguments.
+ */
+ public static boolean hasImportantDependencies(List<? extends BasicValue> values, boolean skipFirst) {
+ for (BasicValue value : values) {
+ if (skipFirst) {
+ skipFirst = false;
+ continue;
+ }
+ if (hasImportantDependencies(value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ *
+ * @return returns whether a value is or contains important dependencies (inputs or collectors)
+ * that require special analysis (e.g. to dig into a nested method)
+ */
+ public static boolean hasImportantDependencies(BasicValue bv) {
+ if (!isTagged(bv)) {
+ return false;
+ }
+ final TaggedValue value = tagged(bv);
+
+ if (value.isInput() || value.isCollector()) {
+ return true;
+ }
+ else if (value.canContainFields() && value.getContainerMapping() != null) {
+ for (TaggedValue tv : value.getContainerMapping().values()) {
+ if (hasImportantDependencies(tv)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public static TaggedValue mergeInputs(List<TaggedValue> returnValues) {
+ TaggedValue first = null;
+ for (TaggedValue tv : returnValues) {
+ if (first == null) {
+ first = tv;
+ }
+ else if (!first.equals(tv)) {
+ return null;
+ }
+ }
+ return first;
+ }
+
+ public static TaggedValue mergeContainers(List<TaggedValue> returnValues) {
+ if (returnValues.size() == 0) {
+ return null;
+ }
+
+ Type returnType = null;
+
+ // do intersections of field names
+ Set<String> keys = null;
+ for (TaggedValue tv : returnValues) {
+ if (keys == null) {
+ keys = new HashSet<String>(tv.getContainerMapping().keySet());
+ returnType = tv.getType();
+ }
+ else {
+ keys.retainAll(tv.getContainerMapping().keySet());
+ }
+ }
+
+ // filter mappings with undefined state
+ final HashMap<String, TaggedValue> resultMapping = new HashMap<String, TaggedValue>(keys.size());
+ final List<String> filteredMappings = new ArrayList<String>(keys.size());
+ for (TaggedValue tv : returnValues) {
+ final Map<String, TaggedValue> cm = tv.getContainerMapping();
+ for (String key : keys) {
+ if (cm.containsKey(key)) {
+ // add mapping with undefined state to filter
+ if (!filteredMappings.contains(key) && cm.get(key) == null) {
+ filteredMappings.add(key);
+ }
+ // add mapping to result mapping
+ else if (!resultMapping.containsKey(key) && !filteredMappings.contains(key)) {
+ resultMapping.put(key, cm.get(key));
+ }
+ // if mapping is present in result and filter,
+ // remove it from result
+ else if (resultMapping.containsKey(key)
+ && filteredMappings.contains(key)) {
+ resultMapping.remove(key);
+ }
+ // if mapping is already present in result,
+ // remove it and mark it as mapping with undefined state in filter
+ else if (resultMapping.containsKey(key)
+ && !filteredMappings.contains(key)
+ && !cm.get(key).equals(resultMapping.get(key))) {
+ filteredMappings.add(key);
+ resultMapping.remove(key);
+ }
+ }
+ }
+ }
+
+ // recursively merge contained mappings
+ Iterator<String> it = resultMapping.keySet().iterator();
+ while (it.hasNext()) {
+ String key = it.next();
+ TaggedValue value = mergeReturnValues(Collections.singletonList(resultMapping.get(key)));
+ if (value == null) {
+ it.remove();
+ }
+ else {
+ resultMapping.put(key, value);
+ }
+ }
+
+ if (resultMapping.size() > 0) {
+ return new TaggedValue(returnType, resultMapping);
+ }
+ return null;
+ }
+
+ public static TaggedValue mergeReturnValues(List<TaggedValue> returnValues) {
+ if (returnValues.size() == 0 || returnValues.get(0) == null) {
+ return null;
+ }
+
+ // check if either all inputs or all containers
+ boolean allInputs = returnValues.get(0).isInput();
+ for (TaggedValue tv : returnValues) {
+ if (tv == null || tv.isInput() != allInputs) {
+ return null;
+ }
+ // check if there are uninteresting values
+ if (tv.canNotContainInput()) {
+ return null;
+ }
+ }
+
+ if (allInputs) {
+ return mergeInputs(returnValues);
+ }
+ return mergeContainers(returnValues);
+ }
+
+ public static void removeUngroupedInputsFromContainer(TaggedValue value) {
+ if (value.getContainerMapping() != null) {
+ Iterator<Map.Entry<String, TaggedValue>> it = value.getContainerMapping().entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, TaggedValue> entry = it.next();
+ if (entry.getValue() == null) {
+ continue;
+ }
+ else if (entry.getValue().isInput() && !entry.getValue().isGrouped()) {
+ it.remove();
+ }
+ else if (entry.getValue().canContainFields()) {
+ removeUngroupedInputsFromContainer(entry.getValue());
+ }
+ }
+ }
+ }
+
+ public static TaggedValue removeUngroupedInputs(TaggedValue value) {
+ if (value.isInput()) {
+ if (value.isGrouped()) {
+ return value;
+ }
+ }
+ else if (value.canContainFields()) {
+ removeUngroupedInputsFromContainer(value);
+ if (value.getContainerMapping() != null && value.getContainerMapping().size() > 0) {
+ return value;
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesPrecedenceTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesPrecedenceTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesPrecedenceTest.java
new file mode 100644
index 0000000..014a3ca
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesPrecedenceTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.CodeAnalysisMode;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the precedence of semantic properties: annotation > API > static code analyzer
+ */
+public class SemanticPropertiesPrecedenceTest {
+
+ @Test
+ public void testFunctionForwardedAnnotationPrecedence() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setCodeAnalysisMode(CodeAnalysisMode.OPTIMIZE);
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42));
+ input
+ .map(new WildcardForwardedMapperWithForwardAnnotation<Tuple3<Long, String, Integer>>())
+ .output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
+ Plan plan = env.createProgramPlan();
+
+ GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
+ MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
+
+ SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
+
+ FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
+ FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
+ FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
+ assertNotNull(fw1);
+ assertNotNull(fw2);
+ assertNotNull(fw3);
+ assertTrue(fw1.contains(0));
+ assertFalse(fw2.contains(1));
+ assertFalse(fw3.contains(2));
+ }
+
+ @Test
+ public void testFunctionSkipCodeAnalysisAnnotationPrecedence() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setCodeAnalysisMode(CodeAnalysisMode.OPTIMIZE);
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42));
+ input
+ .map(new WildcardForwardedMapperWithSkipAnnotation<Tuple3<Long, String, Integer>>())
+ .output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
+ Plan plan = env.createProgramPlan();
+
+ GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
+ MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
+
+ SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
+
+ FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
+ FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
+ FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
+ assertNotNull(fw1);
+ assertNotNull(fw2);
+ assertNotNull(fw3);
+ assertFalse(fw1.contains(0));
+ assertFalse(fw2.contains(1));
+ assertFalse(fw3.contains(2));
+ }
+
+ @Test
+ public void testFunctionApiPrecedence() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setCodeAnalysisMode(CodeAnalysisMode.OPTIMIZE);
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42));
+ input
+ .map(new WildcardForwardedMapper<Tuple3<Long, String, Integer>>())
+ .withForwardedFields("f0")
+ .output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
+ Plan plan = env.createProgramPlan();
+
+ GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
+ MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
+
+ SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
+
+ FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
+ FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
+ FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
+ assertNotNull(fw1);
+ assertNotNull(fw2);
+ assertNotNull(fw3);
+ assertTrue(fw1.contains(0));
+ assertFalse(fw2.contains(1));
+ assertFalse(fw3.contains(2));
+ }
+
+ @Test
+ public void testFunctionAnalyzerPrecedence() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setCodeAnalysisMode(CodeAnalysisMode.OPTIMIZE);
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42));
+ input
+ .map(new WildcardForwardedMapper<Tuple3<Long, String, Integer>>())
+ .output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
+ Plan plan = env.createProgramPlan();
+
+ GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
+ MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
+
+ SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
+
+ FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
+ FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
+ FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
+ assertNotNull(fw1);
+ assertNotNull(fw2);
+ assertNotNull(fw3);
+ assertTrue(fw1.contains(0));
+ assertTrue(fw2.contains(1));
+ assertTrue(fw3.contains(2));
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @FunctionAnnotation.ForwardedFields("f0")
+ public static class WildcardForwardedMapperWithForwardAnnotation<T> implements MapFunction<T, T> {
+
+ @Override
+ public T map(T value) {
+ return value;
+ }
+ }
+
+ @FunctionAnnotation.SkipCodeAnalysis
+ public static class WildcardForwardedMapperWithSkipAnnotation<T> implements MapFunction<T, T> {
+
+ @Override
+ public T map(T value) {
+ return value;
+ }
+ }
+
+ public static class WildcardForwardedMapper<T> implements MapFunction<T, T> {
+
+ @Override
+ public T map(T value) {
+ return value;
+ }
+ }
+}
[3/3] flink git commit: [FLINK-1319] [core] Add static code analysis
for user code
Posted by uc...@apache.org.
[FLINK-1319] [core] Add static code analysis for user code
This closes #729.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c854d526
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c854d526
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c854d526
Branch: refs/heads/master
Commit: c854d5260c20b0926c4347c7c9dd7d0f4f11d620
Parents: d433ba9
Author: twalthr <tw...@apache.org>
Authored: Tue May 26 20:22:03 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jun 8 07:29:49 2015 +0200
----------------------------------------------------------------------
.../flink/api/common/CodeAnalysisMode.java | 52 +
.../flink/api/common/ExecutionConfig.java | 26 +
flink-java/pom.xml | 4 +-
.../flink/api/java/ExecutionEnvironment.java | 3 +
.../java/org/apache/flink/api/java/Utils.java | 4 +
.../api/java/functions/FunctionAnnotation.java | 17 +-
.../api/java/functions/SemanticPropUtil.java | 133 +-
.../api/java/operators/CoGroupOperator.java | 6 +-
.../flink/api/java/operators/CrossOperator.java | 2 +
.../api/java/operators/FilterOperator.java | 2 +
.../api/java/operators/FlatMapOperator.java | 4 +-
.../java/operators/GroupCombineOperator.java | 4 +-
.../api/java/operators/GroupReduceOperator.java | 6 +-
.../flink/api/java/operators/JoinOperator.java | 8 +-
.../flink/api/java/operators/MapOperator.java | 2 +
.../api/java/operators/ReduceOperator.java | 6 +-
.../java/operators/SingleInputUdfOperator.java | 31 +-
.../api/java/operators/TwoInputUdfOperator.java | 43 +-
.../flink/api/java/operators/UdfOperator.java | 1 -
.../api/java/operators/UdfOperatorUtils.java | 103 ++
.../api/java/sca/CodeAnalyzerException.java | 42 +
.../flink/api/java/sca/CodeErrorException.java | 42 +
.../flink/api/java/sca/ModifiedASMAnalyzer.java | 169 +++
.../flink/api/java/sca/ModifiedASMFrame.java | 84 ++
.../api/java/sca/NestedMethodAnalyzer.java | 730 ++++++++++
.../apache/flink/api/java/sca/TaggedValue.java | 421 ++++++
.../apache/flink/api/java/sca/UdfAnalyzer.java | 474 ++++++
.../flink/api/java/sca/UdfAnalyzerUtils.java | 329 +++++
.../SemanticPropertiesPrecedenceTest.java | 183 +++
.../api/java/sca/UdfAnalyzerExamplesTest.java | 707 +++++++++
.../flink/api/java/sca/UdfAnalyzerTest.java | 1353 ++++++++++++++++++
.../apache/flink/test/util/TestEnvironment.java | 4 +-
pom.xml | 4 +-
33 files changed, 4916 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-core/src/main/java/org/apache/flink/api/common/CodeAnalysisMode.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/CodeAnalysisMode.java b/flink-core/src/main/java/org/apache/flink/api/common/CodeAnalysisMode.java
new file mode 100644
index 0000000..e9d8541
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/CodeAnalysisMode.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+/**
+ * Specifies to which extent user-defined functions are analyzed in order
+ * to give the Flink optimizer an insight of UDF internals and inform
+ * the user about common implementation mistakes.
+ *
+ * The analyzer gives hints about:
+ * - ForwardedFields semantic properties
+ * - Warnings if static fields are modified by a Function
+ * - Warnings if a FilterFunction modifies its input objects
+ * - Warnings if a Function returns null
+ * - Warnings if a tuple access uses a wrong index
+ * - Information about the number of object creations (for manual optimization)
+ */
+public enum CodeAnalysisMode {
+
+ /**
+ * Code analysis does not take place.
+ */
+ DISABLE,
+
+ /**
+ * Hints for improvement of the program are printed to the log.
+ */
+ HINT,
+
+ /**
+ * The program will be automatically optimized with knowledge from code
+ * analysis.
+ */
+ OPTIMIZE;
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 04a518e..4974295 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -44,6 +44,10 @@ import java.util.Map;
* handling <i>generic types</i> and <i>POJOs</i>. This is usually only needed
* when the functions return not only the types declared in their signature, but
* also subclasses of those types.</li>
+ * <li>The {@link CodeAnalysisMode} of the program: Enable hinting/optimizing or disable
+ * the "static code analyzer". The static code analyzer pre-interprets user-defined functions in order to
+ * get implementation insights for program improvements that can be printed to the log or
+ * automatically applied.</li>
* </ul>
*/
public class ExecutionConfig implements Serializable {
@@ -78,6 +82,8 @@ public class ExecutionConfig implements Serializable {
private boolean forceAvro = false;
+ private CodeAnalysisMode codeAnalysisMode = CodeAnalysisMode.DISABLE;
+
/** If set to true, progress updates are printed to System.out during execution */
private boolean printProgressDuringExecution = true;
@@ -316,6 +322,26 @@ public class ExecutionConfig implements Serializable {
public boolean isObjectReuseEnabled() {
return objectReuse;
}
+
+ /**
+ * Sets the {@link CodeAnalysisMode} of the program. Specifies to which extent user-defined
+ * functions are analyzed in order to give the Flink optimizer an insight of UDF internals
+ * and inform the user about common implementation mistakes. The static code analyzer pre-interprets
+ * user-defined functions in order to get implementation insights for program improvements
+ * that can be printed to the log, automatically applied, or disabled.
+ *
+ * @param codeAnalysisMode see {@link CodeAnalysisMode}
+ */
+ public void setCodeAnalysisMode(CodeAnalysisMode codeAnalysisMode) {
+ this.codeAnalysisMode = codeAnalysisMode;
+ }
+
+ /**
+ * Returns the {@link CodeAnalysisMode} of the program.
+ */
+ public CodeAnalysisMode getCodeAnalysisMode() {
+ return codeAnalysisMode;
+ }
/**
* Enables the printing of progress update messages to {@code System.out}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 6196e82..8879803 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -60,10 +60,10 @@ under the License.
<dependency>
<groupId>org.ow2.asm</groupId>
- <artifactId>asm</artifactId>
+ <artifactId>asm-all</artifactId>
<version>${asm.version}</version>
</dependency>
-
+
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 3a5b04f..d50ddb4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -991,6 +991,9 @@ public abstract class ExecutionEnvironment {
LOG.debug("Registered Kryo default Serializers: {}", Joiner.on(',').join(config.getDefaultKryoSerializers()));
LOG.debug("Registered Kryo default Serializers Classes {}", Joiner.on(',').join(config.getDefaultKryoSerializerClasses()));
LOG.debug("Registered POJO types: {}", Joiner.on(',').join(config.getRegisteredPojoTypes()));
+
+ // print information about static code analysis
+ LOG.debug("Static code analysis mode: {}", config.getCodeAnalysisMode());
}
return plan;
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index 38b24a2..dd1d6d2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -29,8 +29,10 @@ import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.List;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
+
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
+import static org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAnalysis;
public class Utils {
@@ -70,6 +72,7 @@ public class Utils {
}
}
+ @SkipCodeAnalysis
public static class CountHelper<T> extends RichFlatMapFunction<T, Long> {
private static final long serialVersionUID = 1L;
@@ -93,6 +96,7 @@ public class Utils {
}
}
+ @SkipCodeAnalysis
public static class CollectHelper<T> extends RichFlatMapFunction<T, T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
index 09678fd..bfc1bf0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
@@ -369,7 +369,22 @@ public class FunctionAnnotation {
public @interface ReadFieldsSecond {
String[] value();
}
-
+
+ /**
+ * The SkipCodeAnalysis annotation declares that a function will not be analyzed by Flink's
+ * code analysis capabilities independent of the configured {@link org.apache.flink.api.common.CodeAnalysisMode}.
+ *
+ * If this annotation is not present the static code analyzer pre-interprets user-defined
+ * functions in order to get implementation insights for program improvements that can be
+ * printed to the log as hints, automatically applied, or disabled (see
+ * {@link org.apache.flink.api.common.ExecutionConfig}).
+ *
+ */
+ @Target(ElementType.TYPE)
+ @Retention(RetentionPolicy.RUNTIME)
+ public @interface SkipCodeAnalysis {
+ }
+
/**
* Private constructor to prevent instantiation. This class is intended only as a container.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
index 4569be3..7640e2c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
@@ -19,14 +19,6 @@
package org.apache.flink.api.java.functions;
-import java.lang.annotation.Annotation;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SemanticProperties.InvalidSemanticAnnotationException;
@@ -34,13 +26,13 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException;
import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
-import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFieldsSecond;
import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsFirst;
@@ -48,6 +40,14 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsSecond;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import java.lang.annotation.Annotation;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
public class SemanticPropUtil {
private final static String REGEX_WILDCARD = "[\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR+"\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA+"]";
@@ -235,7 +235,7 @@ public class SemanticPropUtil {
public static SingleInputSemanticProperties getSemanticPropsSingle(
Set<Annotation> set, TypeInformation<?> inType, TypeInformation<?> outType) {
if (set == null) {
- return new SingleInputSemanticProperties();
+ return null;
}
Iterator<Annotation> it = set.iterator();
@@ -264,15 +264,14 @@ public class SemanticPropUtil {
SingleInputSemanticProperties result = new SingleInputSemanticProperties();
getSemanticPropsSingleFromString(result, forwarded, nonForwarded, read, inType, outType);
return result;
- } else {
- return new SingleInputSemanticProperties();
}
+ return null;
}
public static DualInputSemanticProperties getSemanticPropsDual(
Set<Annotation> set, TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType) {
if (set == null) {
- return new DualInputSemanticProperties();
+ return null;
}
Iterator<Annotation> it = set.iterator();
@@ -309,15 +308,20 @@ public class SemanticPropUtil {
getSemanticPropsDualFromString(result, forwardedFirst, forwardedSecond,
nonForwardedFirst, nonForwardedSecond, readFirst, readSecond, inType1, inType2, outType);
return result;
- } else {
- return new DualInputSemanticProperties();
}
+ return null;
+ }
+
+ public static void getSemanticPropsSingleFromString(SingleInputSemanticProperties result,
+ String[] forwarded, String[] nonForwarded, String[] readSet,
+ TypeInformation<?> inType, TypeInformation<?> outType) {
+ getSemanticPropsSingleFromString(result, forwarded, nonForwarded, readSet, inType, outType, false);
}
public static void getSemanticPropsSingleFromString(SingleInputSemanticProperties result,
String[] forwarded, String[] nonForwarded, String[] readSet,
- TypeInformation<?> inType, TypeInformation<?> outType)
- {
+ TypeInformation<?> inType, TypeInformation<?> outType,
+ boolean skipIncompatibleTypes) {
boolean hasForwardedAnnotation = false;
boolean hasNonForwardedAnnotation = false;
@@ -334,9 +338,9 @@ public class SemanticPropUtil {
throw new InvalidSemanticAnnotationException("Either ForwardedFields OR " +
"NonForwardedFields annotation permitted, NOT both.");
} else if(hasForwardedAnnotation) {
- parseForwardedFields(result, forwarded, inType, outType, 0);
+ parseForwardedFields(result, forwarded, inType, outType, 0, skipIncompatibleTypes);
} else if(hasNonForwardedAnnotation) {
- parseNonForwardedFields(result, nonForwarded, inType, outType, 0);
+ parseNonForwardedFields(result, nonForwarded, inType, outType, 0, skipIncompatibleTypes);
}
parseReadFields(result, readSet, inType, 0);
}
@@ -345,8 +349,18 @@ public class SemanticPropUtil {
String[] forwardedFirst, String[] forwardedSecond,
String[] nonForwardedFirst, String[] nonForwardedSecond, String[]
readFieldsFirst, String[] readFieldsSecond,
- TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType)
- {
+ TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType) {
+ getSemanticPropsDualFromString(result, forwardedFirst, forwardedSecond, nonForwardedFirst,
+ nonForwardedSecond, readFieldsFirst, readFieldsSecond, inType1, inType2, outType,
+ false);
+ }
+
+ public static void getSemanticPropsDualFromString(DualInputSemanticProperties result,
+ String[] forwardedFirst, String[] forwardedSecond,
+ String[] nonForwardedFirst, String[] nonForwardedSecond, String[]
+ readFieldsFirst, String[] readFieldsSecond,
+ TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType,
+ boolean skipIncompatibleTypes) {
boolean hasForwardedFirstAnnotation = false;
boolean hasForwardedSecondAnnotation = false;
@@ -377,15 +391,15 @@ public class SemanticPropUtil {
}
if(hasForwardedFirstAnnotation) {
- parseForwardedFields(result, forwardedFirst, inType1, outType, 0);
+ parseForwardedFields(result, forwardedFirst, inType1, outType, 0, skipIncompatibleTypes);
} else if(hasNonForwardedFirstAnnotation) {
- parseNonForwardedFields(result, nonForwardedFirst, inType1, outType, 0);
+ parseNonForwardedFields(result, nonForwardedFirst, inType1, outType, 0, skipIncompatibleTypes);
}
if(hasForwardedSecondAnnotation) {
- parseForwardedFields(result, forwardedSecond, inType2, outType, 1);
+ parseForwardedFields(result, forwardedSecond, inType2, outType, 1, skipIncompatibleTypes);
} else if(hasNonForwardedSecondAnnotation) {
- parseNonForwardedFields(result, nonForwardedSecond, inType2, outType, 1);
+ parseNonForwardedFields(result, nonForwardedSecond, inType2, outType, 1, skipIncompatibleTypes);
}
parseReadFields(result, readFieldsFirst, inType1, 0);
@@ -393,7 +407,8 @@ public class SemanticPropUtil {
}
- private static void parseForwardedFields(SemanticProperties sp, String[] forwardedStr, TypeInformation<?> inType, TypeInformation<?> outType, int input) {
+ private static void parseForwardedFields(SemanticProperties sp, String[] forwardedStr,
+ TypeInformation<?> inType, TypeInformation<?> outType, int input, boolean skipIncompatibleTypes) {
if (forwardedStr == null) {
return;
@@ -412,8 +427,13 @@ public class SemanticPropUtil {
if (wcMatcher.matches()) {
if (!inType.equals(outType)) {
- throw new InvalidSemanticAnnotationException("Forwarded field annotation \"" + s +
- "\" with wildcard only allowed for identical input and output types.");
+ if (skipIncompatibleTypes) {
+ continue;
+ }
+ else {
+ throw new InvalidSemanticAnnotationException("Forwarded field annotation \"" + s +
+ "\" with wildcard only allowed for identical input and output types.");
+ }
}
for (int i = 0; i < inType.getTotalFields(); i++) {
@@ -440,8 +460,13 @@ public class SemanticPropUtil {
try {
// check type compatibility
- if (!areFieldsCompatible(sourceStr, inType, targetStr, outType)) {
- throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match.");
+ if (!areFieldsCompatible(sourceStr, inType, targetStr, outType, !skipIncompatibleTypes)) {
+ if (skipIncompatibleTypes) {
+ continue;
+ }
+ else {
+ throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match.");
+ }
}
List<FlatFieldDescriptor> inFFDs = getFlatFields(sourceStr, inType);
List<FlatFieldDescriptor> outFFDs = getFlatFields(targetStr, outType);
@@ -478,8 +503,13 @@ public class SemanticPropUtil {
String fieldStr = fieldMatcher.group();
try {
// check if field is compatible in input and output type
- if (!areFieldsCompatible(fieldStr, inType, fieldStr, outType)) {
- throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match.");
+ if (!areFieldsCompatible(fieldStr, inType, fieldStr, outType, !skipIncompatibleTypes)) {
+ if (skipIncompatibleTypes) {
+ continue;
+ }
+ else {
+ throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match.");
+ }
}
// add flat field positions
List<FlatFieldDescriptor> inFFDs = getFlatFields(fieldStr, inType);
@@ -503,8 +533,8 @@ public class SemanticPropUtil {
}
}
- private static void parseNonForwardedFields(
- SemanticProperties sp, String[] nonForwardedStr, TypeInformation<?> inType, TypeInformation<?> outType, int input) {
+ private static void parseNonForwardedFields(SemanticProperties sp, String[] nonForwardedStr,
+ TypeInformation<?> inType, TypeInformation<?> outType, int input, boolean skipIncompatibleTypes) {
if(nonForwardedStr == null) {
return;
@@ -521,7 +551,12 @@ public class SemanticPropUtil {
}
if(!inType.equals(outType)) {
- throw new InvalidSemanticAnnotationException("Non-forwarded fields annotation only allowed for identical input and output types.");
+ if (skipIncompatibleTypes) {
+ continue;
+ }
+ else {
+ throw new InvalidSemanticAnnotationException("Non-forwarded fields annotation only allowed for identical input and output types.");
+ }
}
Matcher matcher = PATTERN_LIST.matcher(s);
@@ -613,14 +648,24 @@ public class SemanticPropUtil {
////////////////////// UTIL METHODS ///////////////////////////////
- private static boolean areFieldsCompatible(String sourceField, TypeInformation<?> inType, String targetField, TypeInformation<?> outType) {
-
- // get source type information
- TypeInformation<?> sourceType = getExpressionTypeInformation(sourceField, inType);
- // get target type information
- TypeInformation<?> targetType = getExpressionTypeInformation(targetField, outType);
+ private static boolean areFieldsCompatible(String sourceField, TypeInformation<?> inType, String targetField,
+ TypeInformation<?> outType, boolean throwException) {
- return (sourceType.equals(targetType));
+ try {
+ // get source type information
+ TypeInformation<?> sourceType = getExpressionTypeInformation(sourceField, inType);
+ // get target type information
+ TypeInformation<?> targetType = getExpressionTypeInformation(targetField, outType);
+ return sourceType.equals(targetType);
+ }
+ catch (InvalidFieldReferenceException e) {
+ if (throwException) {
+ throw e;
+ }
+ else {
+ return false;
+ }
+ }
}
private static TypeInformation<?> getExpressionTypeInformation(String fieldStr, TypeInformation<?> typeInfo) {
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 115a238..36378b9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -124,6 +124,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
this.keys1 = keys1;
this.keys2 = keys2;
+
+ UdfOperatorUtils.analyzeDualInputUdf(this, CoGroupFunction.class, defaultName, function, keys1, keys2);
}
@Override
@@ -144,9 +146,9 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
int numFields1 = this.getInput1Type().getTotalFields();
int numFields2 = this.getInput2Type().getTotalFields();
int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ?
- ((Keys.SelectorFunctionKeys) this.keys1).getKeyType().getTotalFields() : 0;
+ ((Keys.SelectorFunctionKeys<?,?>) this.keys1).getKeyType().getTotalFields() : 0;
int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ?
- ((Keys.SelectorFunctionKeys) this.keys2).getKeyType().getTotalFields() : 0;
+ ((Keys.SelectorFunctionKeys<?,?>) this.keys2).getKeyType().getTotalFields() : 0;
props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index 5ed3e40..ae990ce 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -67,6 +67,8 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
this.function = function;
this.defaultName = defaultName;
this.hint = hint;
+
+ UdfOperatorUtils.analyzeDualInputUdf(this, CrossFunction.class, defaultName, function, null, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
index f55de1c..70bfa93 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
@@ -41,6 +41,8 @@ public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperat
this.function = function;
this.defaultName = defaultName;
+
+ UdfOperatorUtils.analyzeSingleInputUdf(this, FilterFunction.class, defaultName, function, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
index 8caacae..10bb286 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
@@ -43,13 +43,15 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl
this.function = function;
this.defaultName = defaultName;
+
+ UdfOperatorUtils.analyzeSingleInputUdf(this, FlatMapFunction.class, defaultName, function, null);
}
@Override
protected FlatMapFunction<IN, OUT> getFunction() {
return function;
}
-
+
@Override
protected FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
String name = getName() != null ? getName() : "FlatMap at "+defaultName;
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index dc26fec..30cb0be 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -98,9 +98,9 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
this.grouper != null &&
this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
- int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+ int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
if(this.grouper instanceof SortedGrouping) {
- offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+ offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
}
props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index bc4413f..fcbb888 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -87,6 +87,8 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
this.defaultName = defaultName;
checkCombinability();
+
+ UdfOperatorUtils.analyzeSingleInputUdf(this, GroupReduceFunction.class, defaultName, function, grouper.keys);
}
private void checkCombinability() {
@@ -132,9 +134,9 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
this.grouper != null &&
this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
- int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+ int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
if(this.grouper instanceof SortedGrouping) {
- offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+ offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
}
props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 4adf6b3..1e5baab 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -202,6 +202,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
this.function = function;
this.joinLocationName = joinLocationName;
+
+ UdfOperatorUtils.analyzeDualInputUdf(this, FlatJoinFunction.class, joinLocationName, function, keys1, keys2);
}
public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
@@ -217,6 +219,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
}
this.function = generatedFunction;
+
+ UdfOperatorUtils.analyzeDualInputUdf(this, JoinFunction.class, joinLocationName, function, keys1, keys2);
}
@Override
@@ -237,9 +241,9 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
int numFields1 = this.getInput1Type().getTotalFields();
int numFields2 = this.getInput2Type().getTotalFields();
int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ?
- ((Keys.SelectorFunctionKeys) this.keys1).getKeyType().getTotalFields() : 0;
+ ((Keys.SelectorFunctionKeys<?,?>) this.keys1).getKeyType().getTotalFields() : 0;
int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ?
- ((Keys.SelectorFunctionKeys) this.keys2).getKeyType().getTotalFields() : 0;
+ ((Keys.SelectorFunctionKeys<?,?>) this.keys2).getKeyType().getTotalFields() : 0;
props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index 2663a2a..eaaeb38 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -45,6 +45,8 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
this.defaultName = defaultName;
this.function = function;
+
+ UdfOperatorUtils.analyzeSingleInputUdf(this, MapFunction.class, defaultName, function, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index e770278..1193da5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -72,6 +72,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
this.function = function;
this.grouper = input;
this.defaultName = defaultName;
+
+ UdfOperatorUtils.analyzeSingleInputUdf(this, ReduceFunction.class, defaultName, function, grouper.keys);
}
@Override
@@ -89,9 +91,9 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
this.grouper != null &&
this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
- int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+ int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
if(this.grouper instanceof SortedGrouping) {
- offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+ offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
}
props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
index f55489f..9301e1a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
@@ -54,8 +54,11 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
private Map<String, DataSet<?>> broadcastVariables;
+ // NOTE: only set this variable via setSemanticProperties()
private SingleInputSemanticProperties udfSemantics;
+ private boolean analyzedUdfSemantics;
+
// --------------------------------------------------------------------------------------------
/**
@@ -157,11 +160,12 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
if(this.udfSemantics == null) {
// extract semantic properties from function annotations
- this.udfSemantics = extractSemanticAnnotations(getFunction().getClass());
+ setSemanticProperties(extractSemanticAnnotations(getFunction().getClass()));
}
- if(this.udfSemantics == null) {
- this.udfSemantics = new SingleInputSemanticProperties();
+ if(this.udfSemantics == null
+ || this.analyzedUdfSemantics) { // discard analyzed semantic properties
+ setSemanticProperties(new SingleInputSemanticProperties());
SemanticPropUtil.getSemanticPropsSingleFromString(this.udfSemantics, forwardedFields, null, null, this.getInputType(), this.getResultType());
} else {
if(udfWithForwardedFieldsAnnotation(getFunction().getClass())) {
@@ -311,11 +315,15 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
@Override
public SingleInputSemanticProperties getSemanticProperties() {
- if (this.udfSemantics == null) {
+ if (this.udfSemantics == null || analyzedUdfSemantics) {
SingleInputSemanticProperties props = extractSemanticAnnotations(getFunction().getClass());
- this.udfSemantics = props != null ? props : new SingleInputSemanticProperties();
+ if (props != null) {
+ setSemanticProperties(props);
+ }
+ }
+ if (this.udfSemantics == null) {
+ setSemanticProperties(new SingleInputSemanticProperties());
}
-
return this.udfSemantics;
}
@@ -329,8 +337,17 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
*/
public void setSemanticProperties(SingleInputSemanticProperties properties) {
this.udfSemantics = properties;
+ this.analyzedUdfSemantics = false;
}
-
+
+ protected boolean getAnalyzedUdfSemanticsFlag() {
+ return this.analyzedUdfSemantics;
+ }
+
+ protected void setAnalyzedUdfSemanticsFlag() {
+ this.analyzedUdfSemantics = true;
+ }
+
protected SingleInputSemanticProperties extractSemanticAnnotations(Class<?> udfClass) {
Set<Annotation> annotations = FunctionAnnotation.readSingleForwardAnnotations(udfClass);
return SemanticPropUtil.getSemanticPropsSingle(annotations, getInputType(), getResultType());
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
index 91f9f7e..d23dd56 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
@@ -29,12 +29,12 @@ import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.functions.SemanticPropUtil;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.DataSet;
/**
* The <tt>TwoInputUdfOperator</tt> is the base class of all binary operators that execute
@@ -56,8 +56,11 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
private Map<String, DataSet<?>> broadcastVariables;
+ // NOTE: only set this variable via setSemanticProperties()
private DualInputSemanticProperties udfSemantics;
+ private boolean analyzedUdfSemantics;
+
// --------------------------------------------------------------------------------------------
/**
@@ -157,13 +160,13 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
*/
@SuppressWarnings("unchecked")
public O withForwardedFieldsFirst(String... forwardedFieldsFirst) {
- if (this.udfSemantics == null) {
+ if (this.udfSemantics == null || this.analyzedUdfSemantics) {
// extract semantic properties from function annotations
- this.udfSemantics = extractSemanticAnnotationsFromUdf(getFunction().getClass());
+ setSemanticProperties(extractSemanticAnnotationsFromUdf(getFunction().getClass()));
}
- if(this.udfSemantics == null) {
- this.udfSemantics = new DualInputSemanticProperties();
+ if(this.udfSemantics == null || this.analyzedUdfSemantics) {
+ setSemanticProperties(new DualInputSemanticProperties());
SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, forwardedFieldsFirst, null,
null, null, null, null, getInput1Type(), getInput2Type(), getResultType());
} else {
@@ -232,13 +235,13 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
*/
@SuppressWarnings("unchecked")
public O withForwardedFieldsSecond(String... forwardedFieldsSecond) {
- if (this.udfSemantics == null) {
+ if (this.udfSemantics == null || this.analyzedUdfSemantics) {
// extract semantic properties from function annotations
- this.udfSemantics = extractSemanticAnnotationsFromUdf(getFunction().getClass());
+ setSemanticProperties(extractSemanticAnnotationsFromUdf(getFunction().getClass()));
}
- if(this.udfSemantics == null) {
- this.udfSemantics = new DualInputSemanticProperties();
+ if(this.udfSemantics == null || this.analyzedUdfSemantics) {
+ setSemanticProperties(new DualInputSemanticProperties());
SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, null, forwardedFieldsSecond,
null, null, null, null, getInput1Type(), getInput2Type(), getResultType());
} else {
@@ -390,11 +393,15 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
@Override
public DualInputSemanticProperties getSemanticProperties() {
- if (this.udfSemantics == null) {
+ if (this.udfSemantics == null || analyzedUdfSemantics) {
DualInputSemanticProperties props = extractSemanticAnnotationsFromUdf(getFunction().getClass());
- this.udfSemantics = props != null ? props : new DualInputSemanticProperties();
+ if (props != null) {
+ setSemanticProperties(props);
+ }
+ }
+ if (this.udfSemantics == null) {
+ setSemanticProperties(new DualInputSemanticProperties());
}
-
return this.udfSemantics;
}
@@ -408,9 +415,17 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
*/
public void setSemanticProperties(DualInputSemanticProperties properties) {
this.udfSemantics = properties;
+ this.analyzedUdfSemantics = false;
}
-
-
+
+ protected boolean getAnalyzedUdfSemanticsFlag() {
+ return this.analyzedUdfSemantics;
+ }
+
+ protected void setAnalyzedUdfSemanticsFlag() {
+ this.analyzedUdfSemantics = true;
+ }
+
protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
Set<Annotation> annotations = FunctionAnnotation.readDualForwardAnnotations(udfClass);
return SemanticPropUtil.getSemanticPropsDual(annotations, getInput1Type(), getInput2Type(), getResultType());
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
index 026cc61..924c84f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
@@ -57,7 +57,6 @@ public interface UdfOperator<O extends UdfOperator<O>> {
/**
* Gets the semantic properties that have been set for the user-defined functions (UDF).
- * This method may return null, if no semantic properties have been set so far.
*
* @return The semantic properties of the UDF.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
new file mode 100644
index 0000000..52a0d08
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.api.common.CodeAnalysisMode;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.sca.CodeAnalyzerException;
+import org.apache.flink.api.java.sca.UdfAnalyzer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class UdfOperatorUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UdfOperatorUtils.class);
+
+ public static void analyzeSingleInputUdf(SingleInputUdfOperator<?, ?, ?> operator, Class<?> udfBaseClass,
+ String defaultName, Function udf, Keys<?> key) {
+ final CodeAnalysisMode mode = operator.getExecutionEnvironment().getConfig().getCodeAnalysisMode();
+ if (mode != CodeAnalysisMode.DISABLE
+ && !udf.getClass().isAnnotationPresent(FunctionAnnotation.SkipCodeAnalysis.class)) {
+ final String operatorName = operator.getName() != null ? operator.getName()
+ : udfBaseClass.getSimpleName() + " at "+defaultName;
+ try {
+ final UdfAnalyzer analyzer = new UdfAnalyzer(udfBaseClass, udf.getClass(), operatorName, operator.getInputType(), null,
+ operator.getResultType(), key, null, mode == CodeAnalysisMode.OPTIMIZE);
+ final boolean success = analyzer.analyze();
+ if (success) {
+ if (mode == CodeAnalysisMode.OPTIMIZE
+ && !operator.udfWithForwardedFieldsAnnotation(udf.getClass())) {
+ analyzer.addSemanticPropertiesHints();
+ operator.setSemanticProperties((SingleInputSemanticProperties) analyzer.getSemanticProperties());
+ operator.setAnalyzedUdfSemanticsFlag();
+ }
+ else if (mode == CodeAnalysisMode.HINT) {
+ analyzer.addSemanticPropertiesHints();
+ }
+ analyzer.printToLogger(LOG);
+ }
+ }
+ catch (InvalidTypesException e) {
+ LOG.debug("Unable to do code analysis due to missing type information.", e);
+ }
+ catch (CodeAnalyzerException e) {
+ LOG.debug("Code analysis failed.", e);
+ }
+ }
+ }
+
+ public static void analyzeDualInputUdf(TwoInputUdfOperator<?, ?, ?, ?> operator, Class<?> udfBaseClass,
+ String defaultName, Function udf, Keys<?> key1, Keys<?> key2) {
+ final CodeAnalysisMode mode = operator.getExecutionEnvironment().getConfig().getCodeAnalysisMode();
+ if (mode != CodeAnalysisMode.DISABLE
+ && !udf.getClass().isAnnotationPresent(FunctionAnnotation.SkipCodeAnalysis.class)) {
+ final String operatorName = operator.getName() != null ? operator.getName()
+ : udfBaseClass.getSimpleName() + " at " + defaultName;
+ try {
+ final UdfAnalyzer analyzer = new UdfAnalyzer(udfBaseClass, udf.getClass(), operatorName, operator.getInput1Type(),
+ operator.getInput2Type(), operator.getResultType(), key1, key2,
+ mode == CodeAnalysisMode.OPTIMIZE);
+ final boolean success = analyzer.analyze();
+ if (success) {
+ if (mode == CodeAnalysisMode.OPTIMIZE
+ && !(operator.udfWithForwardedFieldsFirstAnnotation(udf.getClass())
+ || operator.udfWithForwardedFieldsSecondAnnotation(udf.getClass()))) {
+ analyzer.addSemanticPropertiesHints();
+ operator.setSemanticProperties((DualInputSemanticProperties) analyzer.getSemanticProperties());
+ operator.setAnalyzedUdfSemanticsFlag();
+ }
+ else if (mode == CodeAnalysisMode.HINT) {
+ analyzer.addSemanticPropertiesHints();
+ }
+ analyzer.printToLogger(LOG);
+ }
+ }
+ catch (InvalidTypesException e) {
+ LOG.debug("Unable to do code analysis due to missing type information.", e);
+ }
+ catch (CodeAnalyzerException e) {
+ LOG.debug("Code analysis failed.", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeAnalyzerException.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeAnalyzerException.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeAnalyzerException.java
new file mode 100644
index 0000000..e42ae67
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeAnalyzerException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.sca;
+
+/**
+ * Exception that is thrown if code analysis could not run properly.
+ */
+public class CodeAnalyzerException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public CodeAnalyzerException() {
+ super();
+ }
+
+ public CodeAnalyzerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CodeAnalyzerException(String message) {
+ super(message);
+ }
+
+ public CodeAnalyzerException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeErrorException.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeErrorException.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeErrorException.java
new file mode 100644
index 0000000..9afe5d8
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeErrorException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.sca;
+
+/**
+ * Exception that is thrown if code errors could be found during analysis.
+ */
+public class CodeErrorException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public CodeErrorException() {
+ super();
+ }
+
+ public CodeErrorException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CodeErrorException(String message) {
+ super(message);
+ }
+
+ public CodeErrorException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java
new file mode 100644
index 0000000..4c0d020
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.objectweb.asm.tree.AbstractInsnNode;
+import org.objectweb.asm.tree.InsnList;
+import org.objectweb.asm.tree.JumpInsnNode;
+import org.objectweb.asm.tree.analysis.Analyzer;
+import org.objectweb.asm.tree.analysis.Frame;
+import org.objectweb.asm.tree.analysis.Interpreter;
+
+import java.lang.reflect.Field;
+
+/**
+ * Modified version of ASMs Analyzer. It defines a custom ASM Frame
+ * and allows jump modification which is necessary for UDFs with
+ * one iterable input e.g. GroupReduce.
+ * (see also UdfAnalyzer's field "iteratorTrueAssumptionApplied")
+ */
+public class ModifiedASMAnalyzer extends Analyzer {
+
+ private NestedMethodAnalyzer interpreter;
+
+ public ModifiedASMAnalyzer(Interpreter interpreter) {
+ super(interpreter);
+ this.interpreter = (NestedMethodAnalyzer) interpreter;
+ }
+
+ protected Frame newFrame(int nLocals, int nStack) {
+ return new ModifiedASMFrame(nLocals, nStack);
+ }
+
+ protected Frame newFrame(Frame src) {
+ return new ModifiedASMFrame(src);
+ }
+
+ // type of jump modification
+ private int jumpModification = NO_MOD;
+ private static final int NO_MOD = -1;
+ private static final int IFEQ_MOD = 0;
+ private static final int IFNE_MOD = 1;
+ private int eventInsn;
+
+ // current state of modification
+ private int jumpModificationState = DO_NOTHING;
+ private static final int DO_NOTHING = -1;
+ private static final int PRE_STATE = 0;
+ private static final int MOD_STATE = 1;
+ private static final int WAIT_FOR_INSN_STATE = 2;
+
+ public void requestIFEQLoopModification() {
+ if (jumpModificationState != DO_NOTHING) {
+ throw new CodeAnalyzerException("Unable to do jump modifications (unsupported nested jumping).");
+ }
+ jumpModification = IFEQ_MOD;
+ jumpModificationState = PRE_STATE;
+ }
+
+ public void requestIFNELoopModification() {
+ if (jumpModificationState != DO_NOTHING) {
+ throw new CodeAnalyzerException("Unable to do jump modifications (unsupported nested jumping).");
+ }
+ jumpModification = IFNE_MOD;
+ jumpModificationState = PRE_STATE;
+ }
+
+ @Override
+ protected void newControlFlowEdge(int insn, int successor) {
+ try {
+ if (jumpModificationState == PRE_STATE) {
+ jumpModificationState = MOD_STATE;
+ }
+ else if (jumpModificationState == MOD_STATE) {
+ // this modification swaps the top 2 values on the queue stack
+ // it ensures that the "TRUE" path will be executed first, which is important
+ // for a later merge
+ if (jumpModification == IFEQ_MOD) {
+ final int top = accessField(Analyzer.class, "top").getInt(this);
+ final int[] queue = (int[]) accessField(Analyzer.class, "queue").get(this);
+
+ final int tmp = queue[top - 2];
+ queue[top - 2] = queue[top - 1];
+ queue[top - 1] = tmp;
+
+ eventInsn = queue[top - 2] - 1;
+ final InsnList insns = (InsnList) accessField(Analyzer.class, "insns").get(this);
+ // check if instruction is a goto instruction
+ // if yes this is loop structure
+ if (insns.get(eventInsn) instanceof JumpInsnNode) {
+ jumpModificationState = WAIT_FOR_INSN_STATE;
+ }
+ // no loop -> end of modification
+ else {
+ jumpModificationState = DO_NOTHING;
+ }
+ }
+ // this modification changes the merge priority of certain frames (the expression part of the IF)
+ else if (jumpModification == IFNE_MOD) {
+ final Frame[] frames = (Frame[]) accessField(Analyzer.class, "frames").get(this);
+ final Field indexField = accessField(AbstractInsnNode.class, "index");
+
+ final InsnList insns = (InsnList) accessField(Analyzer.class, "insns").get(this);
+ final AbstractInsnNode gotoInsnn = insns.get(successor - 1);
+ // check for a loop
+ if (gotoInsnn instanceof JumpInsnNode) {
+ jumpModificationState = WAIT_FOR_INSN_STATE;
+ // sets a merge priority for all instructions (the expression of the IF)
+ // from the label the goto instruction points to until the evaluation with IFEQ
+ final int idx = indexField.getInt(accessField(JumpInsnNode.class, "label").get(gotoInsnn));
+
+ for (int i=idx; i <= insn; i++) {
+ ((ModifiedASMFrame) frames[i]).mergePriority = true;
+ }
+ eventInsn = idx - 2;
+ }
+ else {
+ jumpModificationState = DO_NOTHING;
+ }
+ }
+ }
+ // wait for the goto instruction
+ else if (jumpModificationState == WAIT_FOR_INSN_STATE && insn == eventInsn) {
+ jumpModificationState = DO_NOTHING;
+ final Frame[] frames = (Frame[]) accessField(Analyzer.class, "frames").get(this);
+ // merge the goto instruction frame with the frame that follows
+ // this ensures that local variables are kept
+ if (jumpModification == IFEQ_MOD) {
+ interpreter.rightMergePriority = true;
+ final Field top = accessField(Frame.class, "top");
+ top.setInt(frames[eventInsn], top.getInt(frames[eventInsn + 1]));
+ frames[eventInsn + 1].merge(frames[eventInsn], interpreter);
+ }
+ // finally set a merge priority for the last instruction of the loop (before the IF expression)
+ else if (jumpModification == IFNE_MOD) {
+ ((ModifiedASMFrame) frames[eventInsn + 1]).mergePriority = true;
+ }
+ }
+ }
+ catch (Exception e) {
+ throw new CodeAnalyzerException("Unable to do jump modifications.", e);
+ }
+ }
+
+ private Field accessField(Class<?> clazz, String name) {
+ for (Field f : clazz.getDeclaredFields()) {
+ if (f.getName().equals(name)) {
+ f.setAccessible(true);
+ return f;
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java
new file mode 100644
index 0000000..497a15c
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.objectweb.asm.tree.AbstractInsnNode;
+import org.objectweb.asm.tree.analysis.AnalyzerException;
+import org.objectweb.asm.tree.analysis.Frame;
+import org.objectweb.asm.tree.analysis.Interpreter;
+import org.objectweb.asm.tree.analysis.Value;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+/**
+ * Modified version of ASMs Frame. It allows to perform different merge
+ * priorities and passes frame information to the Interpreter.
+ */
+public class ModifiedASMFrame extends Frame {
+
+ public boolean mergePriority;
+
+ public ModifiedASMFrame(int nLocals, int nStack) {
+ super(nLocals, nStack);
+ }
+ public ModifiedASMFrame(Frame src) {
+ super(src);
+ }
+
+ @Override
+ public Frame init(Frame src) {
+ mergePriority = ((ModifiedASMFrame)src).mergePriority;
+ return super.init(src);
+ }
+
+ @Override
+ public void execute(AbstractInsnNode insn, Interpreter interpreter)
+ throws AnalyzerException {
+ NestedMethodAnalyzer nma = ((NestedMethodAnalyzer) interpreter);
+ nma.currentFrame = (ModifiedASMFrame) this;
+ super.execute(insn, interpreter);
+ }
+
+ @Override
+ public boolean merge(Frame frame, Interpreter interpreter) throws AnalyzerException {
+ if (((ModifiedASMFrame)frame).mergePriority) {
+ ((NestedMethodAnalyzer)interpreter).rightMergePriority = true;
+ }
+ final boolean result = super.merge(frame, interpreter);
+ ((NestedMethodAnalyzer)interpreter).rightMergePriority = false;
+ ((ModifiedASMFrame)frame).mergePriority = false;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ // FOR DEBUGGING
+ try {
+ Class<?> frame = Frame.class;
+ Field valuesField = frame.getDeclaredField("values");
+ valuesField.setAccessible(true);
+ Value[] newValues = (Value[]) valuesField.get(this);
+ return Arrays.toString(newValues);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file