You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/06/08 07:33:50 UTC

[1/3] flink git commit: [FLINK-1319] [core] Add static code analysis for user code

Repository: flink
Updated Branches:
  refs/heads/master d433ba9f0 -> c854d5260


http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
new file mode 100644
index 0000000..a1d2b97
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
@@ -0,0 +1,707 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsDualInput;
+import static org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsDualInputWithKeys;
+import static org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsSingleInput;
+import static org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsSingleInputWithKeys;
+
+/**
+ * This class contains some more advanced tests based on examples from "real world".
+ * The examples are not complete copies. They are modified at some points to reduce
+ * dependencies to other classes.
+ */
+@SuppressWarnings("serial")
+public class UdfAnalyzerExamplesTest {
+
+	// --------------------------------------------------------------------------------------------
+	// EnumTriangles
+	// --------------------------------------------------------------------------------------------
+
+	public static class Edge extends Tuple2<Integer, Integer> {
+		private static final long serialVersionUID = 1L;
+
+		public static final int V1 = 0;
+		public static final int V2 = 1;
+
+		public Edge() {}
+
+		public Edge(final Integer v1, final Integer v2) {
+			this.setFirstVertex(v1);
+			this.setSecondVertex(v2);
+		}
+
+		public Integer getFirstVertex() { return this.getField(V1); }
+
+		public Integer getSecondVertex() { return this.getField(V2); }
+
+		public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); }
+
+		public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); }
+
+		public void copyVerticesFromTuple2(Tuple2<Integer, Integer> t) {
+			this.setFirstVertex(t.f0);
+			this.setSecondVertex(t.f1);
+		}
+
+		public void flipVertices() {
+			Integer tmp = this.getFirstVertex();
+			this.setFirstVertex(this.getSecondVertex());
+			this.setSecondVertex(tmp);
+		}
+	}
+
+	public static class Triad extends Tuple3<Integer, Integer, Integer> {
+		private static final long serialVersionUID = 1L;
+
+		public static final int V1 = 0;
+		public static final int V2 = 1;
+		public static final int V3 = 2;
+
+		public Triad() {}
+
+		public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); }
+
+		public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); }
+
+		public void setThirdVertex(final Integer vertex3) { this.setField(vertex3, V3); }
+	}
+
+	@ForwardedFields("0")
+	private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> {
+		private final List<Integer> vertices = new ArrayList<Integer>();
+		private final Triad outTriad = new Triad();
+
+		@Override
+		public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception {
+
+			final Iterator<Edge> edges = edgesIter.iterator();
+
+			// clear vertex list
+			vertices.clear();
+
+			// read first edge
+			Edge firstEdge = edges.next();
+			outTriad.setFirstVertex(firstEdge.getFirstVertex());
+
+			vertices.add(firstEdge.getSecondVertex());
+
+			// build and emit triads
+			while (edges.hasNext()) {
+				Integer higherVertexId = edges.next().getSecondVertex();
+
+				// combine vertex with all previously read vertices
+				for (Integer lowerVertexId : vertices) {
+					outTriad.setSecondVertex(lowerVertexId);
+					outTriad.setThirdVertex(higherVertexId);
+					out.collect(outTriad);
+				}
+				vertices.add(higherVertexId);
+			}
+		}
+	}
+
+	@Test
+	public void testEnumTrianglesBasicExamplesTriadBuilder() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, TriadBuilder.class,
+				"Tuple2<Integer, Integer>",
+				"Tuple3<Integer, Integer, Integer>",
+				new String[] { "0" });
+	}
+
+	@ForwardedFields("0;1")
+	public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> {
+		private final Edge outEdge = new Edge();
+
+		@Override
+		public Edge map(Tuple2<Integer, Integer> t) throws Exception {
+			outEdge.copyVerticesFromTuple2(t);
+			return outEdge;
+		}
+	}
+
+	@Test
+	public void testEnumTrianglesBasicExamplesTupleEdgeConverter() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, TupleEdgeConverter.class,
+				"Tuple2<Integer, Integer>",
+				"Tuple2<Integer, Integer>");
+	}
+
+	private static class EdgeDuplicator implements FlatMapFunction<Edge, Edge> {
+		@Override
+		public void flatMap(Edge edge, Collector<Edge> out) throws Exception {
+			out.collect(edge);
+			edge.flipVertices();
+			out.collect(edge);
+		}
+	}
+
+	@Test
+	public void testEnumTrianglesOptExamplesEdgeDuplicator() {
+		compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, EdgeDuplicator.class,
+				"Tuple2<Integer, Integer>",
+				"Tuple2<Integer, Integer>");
+	}
+
+	private static class DegreeCounter implements GroupReduceFunction<Edge, Edge> {
+		final ArrayList<Integer> otherVertices = new ArrayList<Integer>();
+		final Edge outputEdge = new Edge();
+
+		@Override
+		public void reduce(Iterable<Edge> edgesIter, Collector<Edge> out) {
+
+			Iterator<Edge> edges = edgesIter.iterator();
+			otherVertices.clear();
+
+			// get first edge
+			Edge edge = edges.next();
+			Integer groupVertex = edge.getFirstVertex();
+			this.otherVertices.add(edge.getSecondVertex());
+
+			// get all other edges (assumes edges are sorted by second vertex)
+			while (edges.hasNext()) {
+				edge = edges.next();
+				Integer otherVertex = edge.getSecondVertex();
+				// collect unique vertices
+				if(!otherVertices.contains(otherVertex) && otherVertex != groupVertex) {
+					this.otherVertices.add(otherVertex);
+				}
+			}
+
+			// emit edges
+			for(Integer otherVertex : this.otherVertices) {
+				if(groupVertex < otherVertex) {
+					outputEdge.setFirstVertex(groupVertex);
+					outputEdge.setSecondVertex(otherVertex);
+				} else {
+					outputEdge.setFirstVertex(otherVertex);
+					outputEdge.setSecondVertex(groupVertex);
+				}
+				out.collect(outputEdge);
+			}
+		}
+	}
+
+	@Test
+	public void testEnumTrianglesOptExamplesDegreeCounter() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, DegreeCounter.class,
+				"Tuple2<Integer, Integer>",
+				"Tuple2<Integer, Integer>",
+				new String[] { "0" });
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// KMeans
+	// --------------------------------------------------------------------------------------------
+
+	public static class Point implements Serializable {
+		public double x, y;
+
+		public Point() {}
+
+		public Point(double x, double y) {
+			this.x = x;
+			this.y = y;
+		}
+
+		public Point add(Point other) {
+			x += other.x;
+			y += other.y;
+			return this;
+		}
+
+		public Point div(long val) {
+			x /= val;
+			y /= val;
+			return this;
+		}
+
+		public void clear() {
+			x = y = 0.0;
+		}
+
+		@Override
+		public String toString() {
+			return x + " " + y;
+		}
+	}
+
+	public static class Centroid extends Point {
+		public int id;
+
+		public Centroid() {}
+
+		public Centroid(int id, double x, double y) {
+			super(x,y);
+			this.id = id;
+		}
+
+		public Centroid(int id, Point p) {
+			super(p.x, p.y);
+			this.id = id;
+		}
+
+		@Override
+		public String toString() {
+			return id + " " + super.toString();
+		}
+	}
+
+	@ForwardedFields("0")
+	public static final class CentroidAccumulator implements ReduceFunction<Tuple3<Integer, Point, Long>> {
+		@Override
+		public Tuple3<Integer, Point, Long> reduce(Tuple3<Integer, Point, Long> val1, Tuple3<Integer, Point, Long> val2) {
+			return new Tuple3<Integer, Point, Long>(val1.f0, val1.f1.add(val2.f1), val1.f2 + val2.f2);
+		}
+	}
+
+	@Test
+	public void testKMeansExamplesCentroidAccumulator() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, CentroidAccumulator.class,
+				"Tuple3<Integer, org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Point<x=double,y=double>, Long>",
+				"Tuple3<Integer, org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Point<x=double,y=double>, Long>",
+				new String[] { "0" });
+	}
+
+	@ForwardedFields("0->id")
+	public static final class CentroidAverager implements MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
+		@Override
+		public Centroid map(Tuple3<Integer, Point, Long> value) {
+			return new Centroid(value.f0, value.f1.div(value.f2));
+		}
+	}
+
+	@Test
+	public void testKMeansExamplesCentroidAverager() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, CentroidAverager.class,
+				"Tuple3<Integer, org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Point<x=double,y=double>, Long>",
+				"org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Centroid<x=double,y=double,id=int>");
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// ConnectedComponents
+	// --------------------------------------------------------------------------------------------
+
+	public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
+			invertedEdge.f0 = edge.f1;
+			invertedEdge.f1 = edge.f0;
+			out.collect(edge);
+			out.collect(invertedEdge);
+		}
+	}
+
+	@Test
+	public void testConnectedComponentsExamplesUndirectEdge() {
+		compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, UndirectEdge.class,
+				"Tuple2<Long, Long>",
+				"Tuple2<Long, Long>");
+	}
+
+	@ForwardedFieldsFirst("*")
+	public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		@Override
+		public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) {
+			if (candidate.f1 < old.f1) {
+				out.collect(candidate);
+			}
+		}
+	}
+
+	@Test
+	public void testConnectedComponentsExamplesComponentIdFilter() {
+		compareAnalyzerResultWithAnnotationsDualInput(FlatJoinFunction.class, ComponentIdFilter.class,
+				"Tuple2<Long, Long>",
+				"Tuple2<Long, Long>",
+				"Tuple2<Long, Long>");
+	}
+
+	@ForwardedFields("*->f0;*->f1")
+	public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
+		@Override
+		public Tuple2<T, T> map(T vertex) {
+			return new Tuple2<T, T>(vertex, vertex);
+		}
+	}
+
+	@Test
+	public void testConnectedComponentsExamplesDuplicateValue() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, DuplicateValue.class,
+				"Long",
+				"Tuple2<Long, Long>");
+	}
+
+	@ForwardedFieldsFirst("f1->f1")
+	@ForwardedFieldsSecond("f1->f0")
+	public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		@Override
+		public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
+			return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
+		}
+	}
+
+	@Test
+	public void testConnectedComponentsExamplesNeighborWithComponentIDJoin() {
+		compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, NeighborWithComponentIDJoin.class,
+				"Tuple2<Long, Long>",
+				"Tuple2<Long, Long>",
+				"Tuple2<Long, Long>");
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// WebLogAnalysis
+	// --------------------------------------------------------------------------------------------
+
+	@ForwardedFieldsFirst("f1")
+	public static class AntiJoinVisits implements CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, Tuple3<Integer, String, Integer>> {
+		@Override
+		public void coGroup(Iterable<Tuple3<Integer, String, Integer>> ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) {
+			// Check if there is a entry in the visits relation
+			if (!visits.iterator().hasNext()) {
+				for (Tuple3<Integer, String, Integer> next : ranks) {
+					// Emit all rank pairs
+					out.collect(next);
+				}
+			}
+		}
+	}
+
+	@Test
+	public void testWebLogAnalysisExamplesAntiJoinVisits() {
+		compareAnalyzerResultWithAnnotationsDualInputWithKeys(CoGroupFunction.class, AntiJoinVisits.class,
+				"Tuple3<Integer, String, Integer>",
+				"Tuple1<String>",
+				"Tuple3<Integer, String, Integer>",
+				new String[] { "1" }, new String[] { "0" });
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// PageRankBasic
+	// --------------------------------------------------------------------------------------------
+
+	@ForwardedFields("0")
+	public static class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
+		private final ArrayList<Long> neighbors = new ArrayList<Long>();
+
+		@Override
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
+			neighbors.clear();
+			Long id = 0L;
+
+			for (Tuple2<Long, Long> n : values) {
+				id = n.f0;
+				neighbors.add(n.f1);
+			}
+			out.collect(new Tuple2<Long, Long[]>(id, neighbors.toArray(new Long[neighbors.size()])));
+		}
+	}
+
+	@Test
+	public void testPageRankBasicExamplesBuildOutgoingEdgeList() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, BuildOutgoingEdgeList.class,
+				"Tuple2<Long, Long>",
+				"Tuple2<Long, Long[]>",
+				new String[] { "0" });
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// LogisticRegression
+	// --------------------------------------------------------------------------------------------
+
+	public static class Vector extends Tuple1<double[]> {
+		public Vector() {
+			// default constructor needed for instantiation during serialization
+		}
+
+		public Vector(int size) {
+			double[] components = new double[size];
+			for (int i = 0; i < size; i++) {
+				components[i] = 0.0;
+			}
+			setComponents(components);
+		}
+
+		public double[] getComponents() {
+			return this.f0;
+		}
+
+		public double getComponent(int i) {
+			return this.f0[i];
+		}
+
+		public void setComponent(int i, double value) {
+			this.f0[i] = value;
+		}
+
+		public void setComponents(double[] components) {
+			this.f0 = components;
+		}
+	}
+
+	public static class Gradient extends Vector {
+		public Gradient() {
+			// default constructor needed for instantiation during serialization
+		}
+
+		public Gradient(int size) {
+			super(size);
+		}
+	}
+
+	public static class PointWithLabel extends Tuple2<Integer, double[]> {
+		public double[] getFeatures() {
+			return this.f1;
+		}
+
+		public double getFeature(int i) {
+			return this.f1[i];
+		}
+
+		public void setFeatures(double[] features) {
+			this.f1 = features;
+		}
+
+		public Integer getLabel() {
+			return this.f0;
+		}
+
+		public void setLabel(Integer label) {
+			this.f0 = label;
+		}
+	}
+
+	public static class SumGradient implements ReduceFunction<Gradient> {
+		@Override
+		public Gradient reduce(Gradient gradient1, Gradient gradient2) throws Exception {
+			// grad(i) +=
+			for (int i = 0; i < gradient1.getComponents().length; i++) {
+				gradient1.setComponent(i, gradient1.getComponent(i) + gradient2.getComponent(i));
+			}
+
+			return gradient1;
+		}
+	}
+
+	@Test
+	public void testLogisticRegressionExamplesSumGradient() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, SumGradient.class,
+				"Tuple1<double[]>",
+				"Tuple1<double[]>",
+				new String[] { "0" });
+	}
+
+	public static class PointParser implements MapFunction<String, PointWithLabel> {
+		@Override
+		public PointWithLabel map(String value) throws Exception {
+			PointWithLabel p = new PointWithLabel();
+
+			String[] split = value.split(",");
+			double[] features = new double[42];
+
+			int a = 0;
+			for (int i = 0; i < split.length; i++) {
+
+				if (i == 42 - 1) {
+					p.setLabel(new Integer(split[i].trim().substring(0, 1)));
+				} else {
+					if (a < 42 && split[i].trim() != "") {
+						features[a++] = Double.parseDouble(split[i].trim());
+					}
+				}
+			}
+
+			p.setFeatures(features);
+			return p;
+		}
+	}
+
+	@Test
+	public void testLogisticRegressionExamplesPointParser() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, PointParser.class,
+				"String",
+				"Tuple2<Integer, double[]>");
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Canopy
+	// --------------------------------------------------------------------------------------------
+
+	public static class Document extends Tuple5<Integer, Boolean, Boolean, String, String> {
+		public Document() {
+			// default constructor needed for instantiation during serialization
+		}
+
+		public Document(Integer docId, Boolean isCenter, Boolean isInSomeT2, String canopyCenters, String words) {
+			super(docId, isCenter, isInSomeT2, canopyCenters, words);
+		}
+
+		public Document(Integer docId) {
+			super(docId, null, null, null, null);
+		}
+	}
+
+	public static class MessageBOW implements FlatMapFunction<String, Tuple2<Integer, String>> {
+		@Override
+		public void flatMap(String value, Collector<Tuple2<Integer, String>> out) throws Exception {
+			String[] splits = value.split(" ");
+			if (splits.length < 2) {
+				return;
+			}
+			out.collect(new Tuple2<Integer, String>(Integer.valueOf(splits[0]), splits[1]));
+		}
+	}
+
+	@Test
+	public void testCanopyExamplesMassageBOW() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, PointParser.class,
+				"String",
+				"Tuple2<Integer, String>");
+	}
+
+	@ForwardedFields("0")
+	public static class DocumentReducer implements GroupReduceFunction<Tuple2<Integer, String>, Document> {
+		@Override
+		public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Document> out) throws Exception {
+			Iterator<Tuple2<Integer, String>> it = values.iterator();
+			Tuple2<Integer, String> first = it.next();
+			Integer docId = first.f0;
+			StringBuilder builder = new StringBuilder(first.f1);
+			while (it.hasNext()) {
+				builder.append("-").append(it.next().f1);
+			}
+			out.collect(new Document(docId, false, false, "", builder.toString()));
+		}
+	}
+
+	@Test
+	public void testCanopyExamplesDocumentReducer() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, DocumentReducer.class,
+				"Tuple2<Integer, String>",
+				"Tuple5<Integer, Boolean, Boolean, String, String>",
+				new String[] { "0" });
+	}
+
+	@ForwardedFields("0;4")
+	public static class MapToCenter implements MapFunction<Document, Document> {
+		private Document center;
+
+		@Override
+		public Document map(Document value) throws Exception {
+			if (center != null) {
+				final float similarity = 42f;
+				final boolean isEqual = value.f0.equals(center.f0);
+				value.f1 = isEqual;
+				value.f2 = isEqual || similarity > 42;
+				if (!value.f3.contains(center.f0.toString() + ";") && (similarity > 42 || isEqual)) {
+					value.f3 += center.f0.toString() + ";";
+				}
+			}
+			return value;
+		}
+	}
+
+	@Test
+	public void testCanopyExamplesMapToCenter() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, MapToCenter.class,
+				"Tuple5<Integer, Boolean, Boolean, String, String>",
+				"Tuple5<Integer, Boolean, Boolean, String, String>");
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// K-Meanspp
+	// --------------------------------------------------------------------------------------------
+
+	public static class DocumentWithFreq implements Serializable {
+		private static final long serialVersionUID = -8646398807053061675L;
+
+		public Map<String, Double> wordFreq = new HashMap<String, Double>();
+
+		public Integer id;
+
+		public DocumentWithFreq() {
+			id = -1;
+		}
+
+		public DocumentWithFreq(Integer id) {
+			this.id = id;
+		}
+
+		@Override
+		public String toString() {
+			return Integer.toString(id);
+		}
+	}
+
+	@ForwardedFields("0->id")
+	public static final class RecordToDocConverter implements GroupReduceFunction<Tuple3<Integer, Integer, Double>, DocumentWithFreq> {
+		private static final long serialVersionUID = -8476366121490468956L;
+
+		@Override
+		public void reduce(Iterable<Tuple3<Integer, Integer, Double>> values, Collector<DocumentWithFreq> out) throws Exception {
+			Iterator<Tuple3<Integer, Integer, Double>> it = values.iterator();
+			if (it.hasNext()) {
+				Tuple3<Integer, Integer, Double> elem = it.next();
+				DocumentWithFreq doc = new DocumentWithFreq(elem.f0);
+				doc.wordFreq.put(elem.f1.toString(), elem.f2);
+
+				while (it.hasNext()) {
+					elem = it.next();
+					doc.wordFreq.put(elem.f1.toString(), elem.f2);
+				}
+				out.collect(doc);
+			}
+		}
+	}
+
+	@Test
+	public void testKMeansppExamplesRecordToDocConverter() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, RecordToDocConverter.class,
+				"Tuple3<Integer, Integer, Double>",
+				"org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$DocumentWithFreq<id=Integer,wordFreq=java.util.HashMap>",
+				new String[] { "0" });
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
new file mode 100644
index 0000000..8d8f801
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
@@ -0,0 +1,1353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.annotation.Annotation;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings("serial")
+public class UdfAnalyzerTest {
+
+	@ForwardedFields("f0->*")
+	public static class Map1 implements MapFunction<Tuple2<String, Integer>, String> {
+		public String map(Tuple2<String, Integer> value) throws Exception {
+			return value.f0;
+		}
+	}
+
+	@Test
+	public void testSingleFieldExtract() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map1.class, "Tuple2<String,Integer>",
+				"String");
+	}
+
+	@ForwardedFields("f0->f0;f0->f1")
+	public static class Map2 implements MapFunction<Tuple2<String, Integer>, Tuple2<String, String>> {
+		public Tuple2<String, String> map(Tuple2<String, Integer> value) throws Exception {
+			return new Tuple2<String, String>(value.f0, value.f0);
+		}
+	}
+
+	@Test
+	public void testForwardIntoTuple() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map2.class, "Tuple2<String,Integer>",
+				"Tuple2<String,String>");
+	}
+
+	public static class Map3 implements MapFunction<String[], Integer> {
+		@Override
+		public Integer map(String[] value) throws Exception {
+			return value.length;
+		}
+
+	}
+
+	@Test
+	public void testForwardWithArrayAttrAccess() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map3.class, "String[]", "Integer");
+	}
+
+	public static class Map4 implements MapFunction<MyPojo, String> {
+		@Override
+		public String map(MyPojo value) throws Exception {
+			return value.field2;
+		}
+	}
+
+	@Test
+	public void testForwardWithGenericTypePublicAttrAccess() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map4.class,
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo", "String");
+	}
+
+	@ForwardedFields("field2->*")
+	public static class Map5 implements MapFunction<MyPojo, String> {
+		@Override
+		public String map(MyPojo value) throws Exception {
+			return value.field2;
+		}
+	}
+
+	@Test
+	public void testForwardWithPojoPublicAttrAccess() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map5.class,
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", "String");
+	}
+
+	@ForwardedFields("field->*")
+	public static class Map6 implements MapFunction<MyPojo, String> {
+		@Override
+		public String map(MyPojo value) throws Exception {
+			return value.field;
+		}
+	}
+
+	@Test
+	public void testForwardWithPojoPrivateAttrAccess() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map6.class,
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", "String");
+	}
+
+	@ForwardedFields("f0->f1")
+	public static class Map7 implements MapFunction<Tuple2<String, Integer>, Tuple2<String, String>> {
+		public Tuple2<String, String> map(Tuple2<String, Integer> value) throws Exception {
+			if (value.f0.equals("whatever")) {
+				return new Tuple2<String, String>(value.f0, value.f0);
+			} else {
+				return new Tuple2<String, String>("hello", value.f0);
+			}
+		}
+	}
+
+	@Test
+	public void testForwardIntoTupleWithCondition() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map7.class, "Tuple2<String,Integer>",
+				"Tuple2<String,String>");
+	}
+
+	public static class Map8 implements MapFunction<Tuple2<String, String>, String> {
+		public String map(Tuple2<String, String> value) throws Exception {
+			if (value.f0.equals("whatever")) {
+				return value.f0;
+			} else {
+				return value.f1;
+			}
+		}
+	}
+
+	@Test
+	public void testSingleFieldExtractWithCondition() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map8.class, "Tuple2<String,String>",
+				"String");
+	}
+
+	@ForwardedFields("*->f0")
+	public static class Map9 implements MapFunction<String, Tuple1<String>> {
+		private Tuple1<String> tuple = new Tuple1<String>();
+
+		public Tuple1<String> map(String value) throws Exception {
+			tuple.f0 = value;
+			return tuple;
+		}
+	}
+
+	@Test
+	public void testForwardIntoTupleWithInstanceVar() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map9.class, "String", "Tuple1<String>");
+	}
+
+	@ForwardedFields("*->f0.f0")
+	public static class Map10 implements MapFunction<String, Tuple1<Tuple1<String>>> {
+		private Tuple1<Tuple1<String>> tuple = new Tuple1<Tuple1<String>>();
+
+		public Tuple1<Tuple1<String>> map(String value) throws Exception {
+			tuple.f0.f0 = value;
+			return tuple;
+		}
+	}
+
+	@Test
+	public void testForwardIntoTupleWithInstanceVar2() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map10.class, "String",
+				"Tuple1<Tuple1<String>>");
+	}
+
+	@ForwardedFields("*->f1")
+	public static class Map11 implements MapFunction<String, Tuple2<String, String>> {
+		private Tuple2<String, String> tuple = new Tuple2<String, String>();
+
+		public Tuple2<String, String> map(String value) throws Exception {
+			tuple.f0 = value;
+			modify();
+			tuple.f1 = value;
+			return tuple;
+		}
+
+		private void modify() {
+			tuple.f0 = null;
+		}
+	}
+
+	@Test
+	public void testForwardIntoTupleWithInstanceVarChangedByOtherMethod() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map11.class, "String",
+				"Tuple2<String, String>");
+	}
+
+	@ForwardedFields("f0->f0.f0;f0->f1.f0")
+	public static class Map12 implements MapFunction<Tuple2<String, Integer>, Tuple2<Tuple1<String>, Tuple1<String>>> {
+		public Tuple2<Tuple1<String>, Tuple1<String>> map(Tuple2<String, Integer> value) throws Exception {
+			return new Tuple2<Tuple1<String>, Tuple1<String>>(new Tuple1<String>(value.f0), new Tuple1<String>(
+					value.f0));
+		}
+	}
+
+	@Test
+	public void testForwardIntoNestedTuple() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map12.class, "Tuple2<String,Integer>",
+				"Tuple2<Tuple1<String>,Tuple1<String>>");
+	}
+
+	@ForwardedFields("f0->f1.f0")
+	public static class Map13 implements MapFunction<Tuple2<String, Integer>, Tuple2<Tuple1<String>, Tuple1<String>>> {
+		@SuppressWarnings("unchecked")
+		public Tuple2<Tuple1<String>, Tuple1<String>> map(Tuple2<String, Integer> value) throws Exception {
+			Tuple2<?, ?> t = new Tuple2<Tuple1<String>, Tuple1<String>>(new Tuple1<String>(value.f0),
+					new Tuple1<String>(value.f0));
+			t.f0 = null;
+			return (Tuple2<Tuple1<String>, Tuple1<String>>) t;
+		}
+	}
+
+	@Test
+	public void testForwardIntoNestedTupleWithVarAndModification() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map13.class, "Tuple2<String,Integer>",
+				"Tuple2<Tuple1<String>,Tuple1<String>>");
+	}
+
+	@ForwardedFields("f0")
+	public static class Map14 implements MapFunction<Tuple2<String, Integer>, Tuple2<String, String>> {
+		public Tuple2<String, String> map(Tuple2<String, Integer> value) throws Exception {
+			Tuple2<String, String> t = new Tuple2<String, String>();
+			t.f0 = value.f0;
+			return t;
+		}
+	}
+
+	@Test
+	public void testForwardIntoTupleWithAssignment() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map14.class, "Tuple2<String,Integer>",
+				"Tuple2<String,String>");
+	}
+
+	@ForwardedFields("f0.f0->f0")
+	public static class Map15 implements MapFunction<Tuple2<Tuple1<String>, Integer>, Tuple2<String, String>> {
+		public Tuple2<String, String> map(Tuple2<Tuple1<String>, Integer> value) throws Exception {
+			Tuple2<String, String> t = new Tuple2<String, String>();
+			t.f0 = value.f0.f0;
+			return t;
+		}
+	}
+
+	@Test
+	public void testForwardIntoTupleWithInputPath() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map15.class,
+				"Tuple2<Tuple1<String>,Integer>", "Tuple2<String,String>");
+	}
+
+	@ForwardedFields("field->field2;field2->field")
+	public static class Map16 implements MapFunction<MyPojo, MyPojo> {
+		public MyPojo map(MyPojo value) throws Exception {
+			MyPojo p = new MyPojo();
+			p.setField(value.getField2());
+			p.setField2(value.getField());
+			return p;
+		}
+	}
+
+	@Test
+	public void testForwardIntoPojoByGettersAndSetters() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map16.class,
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>");
+	}
+
+	public static class Map17 implements MapFunction<String, Tuple1<String>> {
+		private Tuple1<String> tuple = new Tuple1<String>();
+
+		public Tuple1<String> map(String value) throws Exception {
+			if (!tuple.f0.equals("")) {
+				tuple.f0 = "empty";
+			} else {
+				tuple.f0 = value;
+			}
+			return tuple;
+		}
+	}
+
+	@Test
+	public void testForwardIntoTupleWithInstanceVarAndCondition() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map17.class, "String", "Tuple1<String>");
+	}
+
+	public static class Map18 implements MapFunction<Tuple1<String>, ArrayList<String>> {
+		private ArrayList<String> list = new ArrayList<String>();
+
+		public ArrayList<String> map(Tuple1<String> value) throws Exception {
+			list.add(value.f0);
+			return list;
+		}
+	}
+
+	@Test
+	public void testForwardIntoUnsupportedObject() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map18.class, "Tuple1<String>",
+				"java.util.ArrayList");
+	}
+
+	@ForwardedFields("*->f0")
+	public static class Map19 implements MapFunction<Integer, Tuple1<Integer>> {
+		@Override
+		public Tuple1<Integer> map(Integer value) throws Exception {
+			Tuple1<Integer> tuple = new Tuple1<Integer>();
+			tuple.f0 = value;
+			Tuple1<Integer> tuple2 = new Tuple1<Integer>();
+			tuple2.f0 = tuple.f0;
+			return tuple2;
+		}
+	}
+
+	@Test
+	public void testForwardWithNewTupleToNewTupleAssignment() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map19.class, "Integer", "Tuple1<Integer>");
+	}
+
+	@ForwardedFields("f0;f1")
+	public static class Map20 implements
+	MapFunction<Tuple4<Integer, Integer, Integer, Integer>, Tuple4<Integer, Integer, Integer, Integer>> {
+
+		@Override
+		public Tuple4<Integer, Integer, Integer, Integer> map(Tuple4<Integer, Integer, Integer, Integer> value)
+				throws Exception {
+			Tuple4<Integer, Integer, Integer, Integer> t = new Tuple4<Integer, Integer, Integer, Integer>();
+			t.f0 = value.getField(0);
+			t.f1 = value.getField((int) 1L);
+			return t;
+		}
+	}
+
+	@Test
+	public void testForwardWithGetMethod() {
+		System.out.println("HERE");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map20.class,
+				"Tuple4<Integer, Integer, Integer, Integer>", "Tuple4<Integer, Integer, Integer, Integer>");
+	}
+
+	@ForwardedFields("f0->f1;f1->f0")
+	public static class Map21 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+		@Override
+		public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
+			Integer i = value.f0;
+			value.setField(value.f1, 0);
+			value.setField(i, 1);
+			return value;
+		}
+	}
+
+	@Test
+	public void testForwardWithSetMethod() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map21.class, "Tuple2<Integer, Integer>",
+				"Tuple2<Integer, Integer>");
+	}
+
+	@ForwardedFields("f0->f1;f1->f0")
+	public static class Map22 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+		@Override
+		public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
+			Tuple2<Integer, Integer> t = new Tuple2<Integer, Integer>();
+			t.setField(value.f1, 0);
+			t.setField(value.getField(0), 1);
+			return t;
+		}
+	}
+
+	@Test
+	public void testForwardIntoNewTupleWithSetMethod() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map22.class, "Tuple2<Integer, Integer>",
+				"Tuple2<Integer, Integer>");
+	}
+
+	@ForwardedFields("*")
+	public static class Map23 implements MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+		@Override
+		public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
+			if (value.f0.equals(23)) {
+				return new Tuple1<Integer>(value.<Integer> getField(0));
+			} else if (value.f0.equals(22)) {
+				Tuple1<Integer> inputContainer = new Tuple1<Integer>();
+				inputContainer.f0 = value.f0;
+				return new Tuple1<Integer>(inputContainer.<Integer> getField(0));
+			} else {
+				return value;
+			}
+		}
+	}
+
+	@Test
+	public void testForwardWithGetMethod2() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map23.class, "Tuple1<Integer>",
+				"Tuple1<Integer>");
+	}
+
+	public static class Map24 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+		@Override
+		public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
+			value.setField(2, 0);
+			int i = 5;
+			value.setField(i * i + 2, 1);
+			return value;
+		}
+	}
+
+	@Test
+	public void testForwardWithSetMethod2() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map24.class, "Tuple2<Integer, Integer>",
+				"Tuple2<Integer, Integer>");
+	}
+
+	@ForwardedFields("f1->f0;f1")
+	public static class Map25 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+		@Override
+		public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
+			value.f0 = value.f1;
+			return value;
+		}
+	}
+
+	@Test
+	public void testForwardWithModifiedInput() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map25.class, "Tuple2<Integer, Integer>",
+				"Tuple2<Integer, Integer>");
+	}
+
+	@ForwardedFields("*->1")
+	public static class Map26 implements MapFunction<Integer, Tuple2<Integer, Integer>> {
+
+		@Override
+		public Tuple2<Integer, Integer> map(Integer value) throws Exception {
+			Tuple2<Integer, Integer> tuple = new Tuple2<Integer, Integer>();
+			// non-input content
+			if (tuple.equals(new Tuple2<Integer, Integer>())) {
+				tuple.f0 = 123456;
+			}
+			if (tuple.equals(new Tuple2<Integer, Integer>())) {
+				tuple.f0 = value;
+			}
+			// forwarding
+			tuple.f1 = value;
+			return tuple;
+		}
+	}
+
+	@Test
+	public void testForwardWithTuplesGetSetFieldMethods() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map26.class, "Integer",
+				"Tuple2<Integer, Integer>");
+	}
+
+	@ForwardedFields("2->3;3->7")
+	public static class Map27
+	implements
+	MapFunction<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>,
+	Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> {
+		@Override
+		public Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer> map(
+				Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer> value)
+						throws Exception {
+			Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer> tuple
+			= new Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>();
+			// non-input content
+			if (tuple.f0 == null) {
+				tuple.setField(123456, 0);
+			} else {
+				tuple.setField(value.f0, 0);
+			}
+			// forwarding
+			tuple.setField(value.f2, 3);
+			tuple.setField(value.f3, 7);
+			// TODO multiple mapping is unsupported yet
+			//tuple.setField(value.f1, 3);
+			return tuple;
+		}
+	}
+
+	@Test
+	public void testForwardWithTuplesGetSetFieldMethods2() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map27.class,
+				"Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>",
+				"Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>");
+	}
+
+	public static class Map28 implements MapFunction<Integer, Integer> {
+		@Override
+		public Integer map(Integer value) throws Exception {
+			if (value == null) {
+				value = 123;
+			}
+			return value;
+		}
+	}
+
+	@Test
+	public void testForwardWithBranching1() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map28.class, "Integer", "Integer");
+	}
+
+	@ForwardedFields("0")
+	public static class Map29 implements MapFunction<Tuple3<String, String, String>, Tuple3<String, String, String>> {
+		@Override
+		public Tuple3<String, String, String> map(Tuple3<String, String, String> value) throws Exception {
+			String tmp = value.f0;
+			for (int i = 0; i < 2; i++) {
+				value.setField("Test", i);
+			}
+			Tuple3<String, String, String> tuple;
+			if (value.f0.equals("x")) {
+				tuple = new Tuple3<String, String, String>(tmp, value.f0, null);
+			} else {
+				tuple = new Tuple3<String, String, String>(tmp, value.f0, "");
+			}
+			return tuple;
+		}
+	}
+
+	@Test
+	public void testForwardWithBranching2() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map29.class,
+				"Tuple3<String, String, String>", "Tuple3<String, String, String>");
+	}
+
+	public static class Map30 implements MapFunction<Tuple2<String, String>, String> {
+		@Override
+		public String map(Tuple2<String, String> value) throws Exception {
+			String tmp;
+			if (value.f0.equals("")) {
+				tmp = value.f0;
+			} else {
+				tmp = value.f1;
+			}
+			return tmp;
+		}
+	}
+
+	@Test
+	public void testForwardWithBranching3() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map30.class, "Tuple2<String,String>",
+				"String");
+	}
+
+	@ForwardedFields("1->1;1->0")
+	public static class Map31 implements MapFunction<Tuple2<String, String>, ExtendingTuple> {
+		@Override
+		public ExtendingTuple map(Tuple2<String, String> value) throws Exception {
+			ExtendingTuple t = new ExtendingTuple();
+			t.f1 = value.f1;
+			t.setFirstField();
+			t.f0 = t.getSecondField();
+			return t;
+		}
+	}
+
+	@Test
+	public void testForwardWithInheritance() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map31.class, "Tuple2<String,String>",
+				"Tuple2<String,String>");
+	}
+
+	@ForwardedFields("*")
+	public static class Map32
+	implements
+	MapFunction<Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>,
+	Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>> {
+
+		@Override
+		public Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double> map(
+				Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double> value) throws Exception {
+			boolean f0 = value.f0;
+			char f1 = value.f1;
+			byte f2 = value.f2;
+			short f3 = value.f3;
+			int f4 = value.f4;
+			long f5 = value.f5;
+			float f6 = value.f6;
+			double f7 = value.f7;
+			return new Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>(
+					f0, f1, f2, f3, f4, f5, f6, f7);
+		}
+	}
+
+	@Test
+	public void testForwardWithUnboxingAndBoxing() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map32.class,
+				"Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>",
+				"Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>");
+	}
+
+	public static class Map33 implements MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		@Override
+		public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
+			Tuple2<Long, Long> t = new Tuple2<Long, Long>();
+			if (value.f0 != null) {
+				t.f0 = value.f0;
+			}
+			else {
+				t.f0 = value.f1;
+			}
+			return t;
+		}
+	}
+
+	@Test
+	public void testForwardWithBranching4() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map33.class, "Tuple2<Long, Long>",
+				"Tuple2<Long, Long>");
+	}
+
+	@ForwardedFields("1")
+	public static class Map34 implements MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		private Tuple2<Long, Long> t;
+		@Override
+		public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
+			if (value != new Object()) {
+				return value;
+			}
+			else if (value.f0 == 1L && value.f1 == 2L) {
+				t = value;
+				t.f0 = 23L;
+				return t;
+			}
+			return new Tuple2<Long, Long>(value.f0, value.f1);
+		}
+	}
+
+	@Test
+	public void testForwardWithBranching5() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map34.class, "Tuple2<Long, Long>",
+				"Tuple2<Long, Long>");
+	}
+
+	public static class Map35 implements MapFunction<String[], Tuple2<String[], String[]>> {
+		@Override
+		public Tuple2<String[], String[]> map(String[] value) throws Exception {
+			String[] tmp = value;
+			value[0] = "Hello";
+			return new Tuple2<String[], String[]>(value, tmp);
+		}
+	}
+
+	@Test
+	public void testForwardWithArrayModification() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map35.class, "String[]",
+				"Tuple2<String[], String[]>");
+	}
+
+	public static class Map36 implements MapFunction<Tuple3<String, String, String>, Tuple3<String, String, String>> {
+		@Override
+		public Tuple3<String, String, String> map(Tuple3<String, String, String> value) throws Exception {
+			int i = 0;
+			do {
+				value.setField("", i);
+				i++;
+			} while (i >= 2);
+			return value;
+		}
+	}
+
+	@Test
+	public void testForwardWithBranching6() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map36.class, "Tuple3<String, String, String>",
+				"Tuple3<String, String, String>");
+	}
+
+	public static class Map37 implements MapFunction<Tuple1<Tuple1<String>>, Tuple1<Tuple1<String>>> {
+		@SuppressWarnings("unchecked")
+		@Override
+		public Tuple1<Tuple1<String>> map(Tuple1<Tuple1<String>> value) throws Exception {
+			((Tuple1<String>) value.getField(Integer.valueOf("2."))).f0 = "Hello";
+			return value;
+		}
+	}
+
+	@Test
+	public void testForwardWithGetAndModification() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map37.class, "Tuple1<Tuple1<String>>",
+				"Tuple1<Tuple1<String>>");
+	}
+
+	@ForwardedFields("field")
+	public static class Map38 implements MapFunction<MyPojo2, MyPojo2> {
+		@Override
+		public MyPojo2 map(MyPojo2 value) throws Exception {
+			value.setField2("test");
+			return value;
+		}
+	}
+
+	@Test
+	public void testForwardWithInheritance2() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map38.class,
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo2<field=String,field2=String>",
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo2<field=String,field2=String>");
+	}
+
+	public static class Map39 implements MapFunction<MyPojo, MyPojo> {
+		@Override
+		public MyPojo map(MyPojo value) throws Exception {
+			MyPojo mp = new MyPojo();
+			mp.field = value.field2;
+			return mp;
+		}
+	}
+
+	@Test
+	public void testForwardWithGenericTypeOutput() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map39.class,
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo");
+	}
+
+	@ForwardedFields("field2")
+	public static class Map40 implements MapFunction<MyPojo, MyPojo> {
+		@Override
+		public MyPojo map(MyPojo value) throws Exception {
+			return recursiveFunction(value);
+		}
+
+		private MyPojo recursiveFunction(MyPojo value) {
+			if (value.field == "xyz") {
+				value.field = value.field + "x";
+				return recursiveFunction(value);
+			}
+			return value;
+		}
+	}
+
+	@Test
+	public void testForwardWithRecursion() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map40.class,
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>");
+	}
+
+	@ForwardedFields("field;field2")
+	public static class Map41 extends RichMapFunction<MyPojo, MyPojo> {
+		private MyPojo field;
+		@Override
+		public MyPojo map(MyPojo value) throws Exception {
+			field = value;
+			getRuntimeContext().getIntCounter("test").getLocalValue();
+			return field;
+		}
+	}
+
+	@Test
+	public void testForwardWithGetRuntimeContext() {
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map41.class,
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>");
+	}
+
+	@ForwardedFields("*")
+	public static class FlatMap1 implements FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+		@Override
+		public void flatMap(Tuple1<Integer> value, Collector<Tuple1<Integer>> out) throws Exception {
+			out.collect(value);
+		}
+	}
+
+	@Test
+	public void testForwardWithCollector() {
+		compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap1.class, "Tuple1<Integer>",
+				"Tuple1<Integer>");
+	}
+
+	@ForwardedFields("0->1;1->0")
+	public static class FlatMap2 implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
+			invertedEdge.f0 = edge.f1;
+			invertedEdge.f1 = edge.f0;
+			out.collect(invertedEdge);
+			out.collect(invertedEdge);
+		}
+	}
+
+	@Test
+	public void testForwardWith2Collectors() {
+		compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap2.class, "Tuple2<Long, Long>",
+				"Tuple2<Long, Long>");
+	}
+
+	public static class FlatMap3 implements FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+		@Override
+		public void flatMap(Tuple1<Integer> value, Collector<Tuple1<Integer>> out) throws Exception {
+			addToCollector(out);
+			out.collect(value);
+		}
+
+		private void addToCollector(Collector<Tuple1<Integer>> out) {
+			out.collect(new Tuple1<Integer>());
+		}
+	}
+
+	@Test
+	public void testForwardWithCollectorPassing() {
+		compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap3.class, "Tuple1<Integer>",
+				"Tuple1<Integer>");
+	}
+
+	@ForwardedFieldsFirst("f1->f1")
+	@ForwardedFieldsSecond("f1->f0")
+	public static class Join1 implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		@Override
+		public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
+			return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
+		}
+	}
+
+	@Test
+	public void testForwardWithDualInput() {
+		compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, Join1.class, "Tuple2<Long, Long>",
+				"Tuple2<Long, Long>", "Tuple2<Long, Long>");
+	}
+
+	@ForwardedFieldsFirst("*")
+	public static class Join2 implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		@Override
+		public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) {
+			if (candidate.f1 < old.f1) {
+				out.collect(candidate);
+			}
+		}
+	}
+
+	@Test
+	public void testForwardWithDualInputAndCollector() {
+		compareAnalyzerResultWithAnnotationsDualInput(FlatJoinFunction.class, Join2.class, "Tuple2<Long, Long>",
+				"Tuple2<Long, Long>", "Tuple2<Long, Long>");
+	}
+
+	@ForwardedFields("0")
+	public static class GroupReduce1 implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		@Override
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(values.iterator().next());
+		}
+	}
+
+	@Test
+	public void testForwardWithIterable() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce1.class,
+				"Tuple2<Long, Long>", "Tuple2<Long, Long>", new String[] { "0" });
+	}
+
+	@ForwardedFields("1->0")
+	public static class GroupReduce2 implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		@Override
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
+			final Iterator<Tuple2<Long, Long>> it = values.iterator();
+			Tuple2<Long, Long> outTuple = new Tuple2<Long, Long>();
+			Tuple2<Long, Long> first = it.next();
+			outTuple.f0 = first.f1;
+			outTuple.f1 = first.f0;
+
+			while (it.hasNext()) {
+				Tuple2<Long, Long> t = it.next();
+				if (t.f0 == 42) {
+					outTuple.f1 += t.f0;
+				}
+			}
+			out.collect(outTuple);
+		}
+	}
+
+	@Test
+	public void testForwardWithIterable2() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce2.class,
+				"Tuple2<Long, Long>", "Tuple2<Long, Long>", new String[] { "0", "1" });
+	}
+
+	@ForwardedFields("field2")
+	public static class GroupReduce3 implements GroupReduceFunction<MyPojo, MyPojo> {
+		@Override
+		public void reduce(Iterable<MyPojo> values, Collector<MyPojo> out) throws Exception {
+			for (MyPojo value : values) {
+				out.collect(value);
+			}
+		}
+	}
+
+	@Test
+	public void testForwardWithIterable3() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce3.class,
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", new String[] { "field2" });
+	}
+
+	@ForwardedFields("f0->*")
+	public static class GroupReduce4 implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
+		@Override
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Long> out) throws Exception {
+			Long id = 0L;
+			for (Tuple2<Long, Long> value : values) {
+				id = value.f0;
+			}
+			out.collect(id);
+		}
+	}
+
+	@Test
+	public void testForwardWithAtLeastOneIterationAssumption() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce4.class,
+				"Tuple2<Long, Long>", "Long", new String[] { "f0" });
+	}
+
+	@ForwardedFields("f0->*")
+	public static class GroupReduce4_Javac implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
+		@SuppressWarnings("unchecked")
+		@Override
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Long> out) throws Exception {
+			Long id = 0L;
+			@SuppressWarnings("rawtypes")
+			Iterator it = values.iterator();
+			if (it.hasNext()) {
+				id = ((Tuple2<Long, Long>) it.next()).f0;
+			}
+			else {
+				System.out.println("hello world");
+			}
+			out.collect(id);
+		}
+	}
+
+	@Test
+	public void testForwardWithAtLeastOneIterationAssumptionForJavac() {
+		// this test simulates javac behaviour in Eclipse IDE
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce4_Javac.class,
+				"Tuple2<Long, Long>", "Long", new String[] { "f0" });
+	}
+
+	public static class GroupReduce5 implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
+		@Override
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Long> out) throws Exception {
+			Long id = 0L;
+			for (Tuple2<Long, Long> value : values) {
+				id = value.f0;
+				if (value != null) {
+					id = value.f1;
+				}
+			}
+			out.collect(id);
+		}
+	}
+
+	@Test
+	public void testForwardWithAtLeastOneIterationAssumption2() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce5.class,
+				"Tuple2<Long, Long>", "Long", new String[] { "f1" });
+	}
+
+	public static class GroupReduce6 implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
+		@Override
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Long> out) throws Exception {
+			Long id = 0L;
+			for (Tuple2<Long, Long> value : values) {
+				id = value.f0;
+			}
+			id = 0L;
+			out.collect(id);
+		}
+	}
+
+	@Test
+	public void testForwardWithAtLeastOneIterationAssumption3() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce6.class,
+				"Tuple2<Long, Long>", "Long", new String[] { "f0" });
+	}
+
+	public static class GroupReduce7 implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
+		@Override
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Long> out) throws Exception {
+			Long id = 0L;
+			for (Tuple2<Long, Long> value : values) {
+				id = value.f0;
+			}
+			id = 0L;
+			out.collect(id);
+		}
+	}
+
+	@Test
+	public void testForwardWithAtLeastOneIterationAssumption4() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce7.class,
+				"Tuple2<Long, Long>", "Long", new String[] { "f0" });
+	}
+
+	@ForwardedFields("f0->*")
+	public static class GroupReduce8 implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
+		@Override
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Long> out) throws Exception {
+			Long id = 0L;
+			Iterator<Tuple2<Long, Long>> it = values.iterator();
+			while (it.hasNext()) {
+				id = it.next().f0;
+			}
+			out.collect(id);
+		}
+	}
+
+	@Test
+	public void testForwardWithAtLeastOneIterationAssumption5() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce8.class,
+				"Tuple2<Long, Long>", "Long", new String[] { "f0" });
+	}
+
+	@ForwardedFields("f0")
+	public static class GroupReduce9 implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		@Override
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
+			Tuple2<Long, Long> rv = null;
+			Iterator<Tuple2<Long, Long>> it = values.iterator();
+			while (it.hasNext()) {
+				rv = it.next();
+			}
+			out.collect(rv);
+		}
+	}
+
+	@Test
+	public void testForwardWithAtLeastOneIterationAssumption6() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce9.class,
+				"Tuple2<Long, Long>", "Tuple2<Long, Long>", new String[] { "f0" });
+	}
+
+	public static class GroupReduce10 implements GroupReduceFunction<Tuple2<Long, Long>, Boolean> {
+		@Override
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Boolean> out) throws Exception {
+			Iterator<Tuple2<Long, Long>> it = values.iterator();
+			boolean f = it.hasNext();
+			if (!f) {
+				System.out.println();
+			}
+			if (f) {
+				System.out.println();
+			}
+			out.collect(f);
+		}
+	}
+
+	@Test
+	public void testForwardWithAtLeastOneIterationAssumption7() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce10.class,
+				"Tuple2<Long, Long>", "Boolean", new String[] { "f0" });
+	}
+
+	@ForwardedFields("field")
+	public static class Reduce1 implements ReduceFunction<MyPojo> {
+		@Override
+		public MyPojo reduce(MyPojo value1, MyPojo value2) throws Exception {
+			return new MyPojo(value1.getField(), value2.getField2());
+		}
+	}
+
+	@Test
+	public void testForwardWithReduce() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, Reduce1.class,
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+				new String[] { "field" });
+	}
+
+	@ForwardedFields("field")
+	public static class Reduce2 implements ReduceFunction<MyPojo> {
+		@Override
+		public MyPojo reduce(MyPojo value1, MyPojo value2) throws Exception {
+			if (value1.field == "") {
+				return value2;
+			}
+			return value1;
+		}
+	}
+
+	@Test
+	public void testForwardWithBranchingReduce() {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, Reduce2.class,
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
+				new String[] { "field" });
+	}
+
+	public static class NullReturnMapper1 implements MapFunction<String, String> {
+		@Override
+		public String map(String value) throws Exception {
+			return null;
+		}
+	}
+
+	public static class NullReturnMapper2 implements MapFunction<String, String> {
+		@Override
+		public String map(String value) throws Exception {
+			if (value.equals("test")) {
+				return null;
+			}
+			return "";
+		}
+	}
+
+	public static class NullReturnFlatMapper implements FlatMapFunction<String, String> {
+		@Override
+		public void flatMap(String value, Collector<String> out) throws Exception {
+			String s = null;
+			if ("dd".equals("")) {
+				s = "";
+			}
+			out.collect(s);
+		}
+	}
+
+	@Test
+	public void testNullReturnException() {
+		try {
+			final UdfAnalyzer ua = new UdfAnalyzer(MapFunction.class, NullReturnMapper1.class, "operator",
+					BasicTypeInfo.STRING_TYPE_INFO, null, BasicTypeInfo.STRING_TYPE_INFO, null, null, true);
+			ua.analyze();
+			Assert.fail();
+		}
+		catch (CodeErrorException e) {
+			// ok
+		}
+		try {
+			final UdfAnalyzer ua = new UdfAnalyzer(MapFunction.class, NullReturnMapper2.class, "operator",
+					BasicTypeInfo.STRING_TYPE_INFO, null, BasicTypeInfo.STRING_TYPE_INFO, null, null, true);
+			ua.analyze();
+			Assert.fail();
+		}
+		catch (CodeErrorException e) {
+			// ok
+		}
+		try {
+			final UdfAnalyzer ua = new UdfAnalyzer(FlatMapFunction.class, NullReturnFlatMapper.class, "operator",
+					BasicTypeInfo.STRING_TYPE_INFO, null, BasicTypeInfo.STRING_TYPE_INFO, null, null, true);
+			ua.analyze();
+			Assert.fail();
+		}
+		catch (CodeErrorException e) {
+			// ok
+		}
+	}
+
+
+	public static class PutStaticMapper implements MapFunction<String, String> {
+		public static String test = "";
+
+		@Override
+		public String map(String value) throws Exception {
+			test = "test";
+			return "";
+		}
+	}
+
+	@Test
+	public void testPutStaticException() {
+		try {
+			final UdfAnalyzer ua = new UdfAnalyzer(MapFunction.class, PutStaticMapper.class, "operator",
+					BasicTypeInfo.STRING_TYPE_INFO, null, BasicTypeInfo.STRING_TYPE_INFO, null, null, true);
+			ua.analyze();
+			Assert.fail();
+		}
+		catch (CodeErrorException e) {
+			// ok
+		}
+	}
+
+	public static class FilterMod1 implements FilterFunction<Tuple2<String, String>> {
+
+		@Override
+		public boolean filter(Tuple2<String, String> value) throws Exception {
+			value.f0 = value.f1;
+			return false;
+		}
+	}
+
+	@Test
+	public void testFilterModificationException1() {
+		try {
+			final UdfAnalyzer ua = new UdfAnalyzer(FilterFunction.class, FilterMod1.class, "operator",
+					TypeInfoParser.parse("Tuple2<String, String>"), null, null, null, null, true);
+			ua.analyze();
+			Assert.fail();
+		}
+		catch (CodeErrorException e) {
+			// ok
+		}
+	}
+
+	public static class FilterMod2 implements FilterFunction<Tuple2<String, String>> {
+
+		@Override
+		public boolean filter(Tuple2<String, String> value) throws Exception {
+			value.f0 = "";
+			return false;
+		}
+	}
+
+	@Test
+	public void testFilterModificationException2() {
+		try {
+			final UdfAnalyzer ua = new UdfAnalyzer(FilterFunction.class, FilterMod2.class, "operator",
+					TypeInfoParser.parse("Tuple2<String, String>"), null, null, null, null, true);
+			ua.analyze();
+			Assert.fail();
+		}
+		catch (CodeErrorException e) {
+			// ok
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Utils
+	// --------------------------------------------------------------------------------------------
+
+	public static class MyPojo {
+		private String field;
+		public String field2;
+
+		public MyPojo() {
+			// default constructor
+		}
+
+		public MyPojo(String field, String field2) {
+			this.field = field;
+			this.field2 = field2;
+		}
+
+		public String getField() {
+			return field;
+		}
+
+		public void setField(String field) {
+			this.field = field;
+		}
+
+		public String getField2() {
+			return field2;
+		}
+
+		public void setField2(String field2) {
+			this.field2 = field2;
+		}
+	}
+
+	public static class MyPojo2 extends MyPojo {
+
+		public MyPojo2() {
+			// default constructor
+		}
+	}
+
+	public static class ExtendingTuple extends Tuple2<String, String> {
+		public void setFirstField() {
+			setField("Hello", 0);
+		}
+
+		public String getSecondField() {
+			return getField(1);
+		}
+	}
+
+	public static void compareAnalyzerResultWithAnnotationsSingleInput(Class<?> baseClass, Class<?> clazz, String in,
+			String out) {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(baseClass, clazz, in, out, null);
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public static void compareAnalyzerResultWithAnnotationsSingleInputWithKeys(Class<?> baseClass, Class<?> clazz,
+			String in, String out, String[] keys) {
+		final TypeInformation<?> inType = TypeInfoParser.parse(in);
+		final TypeInformation<?> outType = TypeInfoParser.parse(out);
+
+		// expected
+		final Set<Annotation> annotations = FunctionAnnotation.readSingleForwardAnnotations(clazz);
+		SingleInputSemanticProperties expected = SemanticPropUtil.getSemanticPropsSingle(annotations, inType,
+				outType);
+		if (expected == null) {
+			expected = new SingleInputSemanticProperties();
+		}
+
+		// actual
+		final UdfAnalyzer ua = new UdfAnalyzer(baseClass, clazz, "operator", inType, null, outType, (keys == null) ? null
+				: new Keys.ExpressionKeys(keys, inType), null, true);
+		ua.analyze();
+		final SingleInputSemanticProperties actual = (SingleInputSemanticProperties) ua.getSemanticProperties();
+
+		assertEquals(expected.toString(), actual.toString());
+	}
+
+	public static void compareAnalyzerResultWithAnnotationsDualInput(Class<?> baseClass, Class<?> clazz, String in1,
+			String in2, String out) {
+		compareAnalyzerResultWithAnnotationsDualInputWithKeys(baseClass, clazz, in1, in2, out, null, null);
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public static void compareAnalyzerResultWithAnnotationsDualInputWithKeys(Class<?> baseClass, Class<?> clazz,
+			String in1, String in2, String out, String[] keys1, String[] keys2) {
+		final TypeInformation<?> in1Type = TypeInfoParser.parse(in1);
+		final TypeInformation<?> in2Type = TypeInfoParser.parse(in2);
+		final TypeInformation<?> outType = TypeInfoParser.parse(out);
+
+		// expected
+		final Set<Annotation> annotations = FunctionAnnotation.readDualForwardAnnotations(clazz);
+		final DualInputSemanticProperties expected = SemanticPropUtil.getSemanticPropsDual(annotations, in1Type,
+				in2Type, outType);
+
+		// actual
+		final UdfAnalyzer ua = new UdfAnalyzer(baseClass, clazz, "operator", in1Type, in2Type, outType, (keys1 == null) ? null
+				: new Keys.ExpressionKeys(keys1, in1Type), (keys2 == null) ? null : new Keys.ExpressionKeys(
+						keys2, in2Type), true);
+		ua.analyze();
+		final DualInputSemanticProperties actual = (DualInputSemanticProperties) ua.getSemanticProperties();
+
+		assertEquals(expected.toString(), actual.toString());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index cd10cc6..80df0f8 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.util;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.CodeAnalysisMode;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.optimizer.DataStatistics;
@@ -29,7 +30,6 @@ import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-
 import org.junit.Assert;
 
 public class TestEnvironment extends ExecutionEnvironment {
@@ -39,6 +39,8 @@ public class TestEnvironment extends ExecutionEnvironment {
 	public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
 		this.executor = executor;
 		setParallelism(parallelism);
+		// disabled to improve build time
+		getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 11a9ed1..f1e0231 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,8 +85,8 @@ under the License.
 		<kryoserialization.version>0.3.2</kryoserialization.version>
 		<protobuf.version>2.5.0</protobuf.version>
 		<chill.version>0.5.2</chill.version>
-		<asm.version>5.0.3</asm.version>
-        <tez.version>0.6.1</tez.version>
+		<asm.version>5.0.4</asm.version>
+		<tez.version>0.6.1</tez.version>
 	</properties>
 
 	<dependencies>


[2/3] flink git commit: [FLINK-1319] [core] Add static code analysis for user code

Posted by uc...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
new file mode 100644
index 0000000..687e249
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
@@ -0,0 +1,730 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.findMethodNode;
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.hasImportantDependencies;
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.isTagged;
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.mergeReturnValues;
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.removeUngroupedInputs;
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.tagged;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.java.sca.TaggedValue.Tag;
+import org.objectweb.asm.Type;
+import org.objectweb.asm.tree.AbstractInsnNode;
+import org.objectweb.asm.tree.FieldInsnNode;
+import org.objectweb.asm.tree.IntInsnNode;
+import org.objectweb.asm.tree.LdcInsnNode;
+import org.objectweb.asm.tree.MethodInsnNode;
+import org.objectweb.asm.tree.MethodNode;
+import org.objectweb.asm.tree.TypeInsnNode;
+import org.objectweb.asm.tree.analysis.AnalyzerException;
+import org.objectweb.asm.tree.analysis.BasicInterpreter;
+import org.objectweb.asm.tree.analysis.BasicValue;
+
+/**
+ * Extends ASM's BasicInterpreter. Instead of ASM's BasicValues, it introduces
+ * TaggedValues which extend BasicValue and allows for appending interesting
+ * information to values. The NestedMethodAnalyzer follows interesting values
+ * and analyzes nested methods (control/method flow) if necessary.
+ */
+public class NestedMethodAnalyzer extends BasicInterpreter {
+
+	// reference to current UDF analysis context
+	private final UdfAnalyzer analyzer;
+
+	// flag that indicates if current method is still the Function's SAM method
+	private final boolean topLevelMethod;
+
+	// method description and arguments
+	private final String owner;
+	private final MethodNode methodNode;
+	private final List<BasicValue> argumentValues;
+
+	// remaining nesting level
+	private final int remainingNesting;
+
+	// ASM analyzer which analyzes this methodNode
+	private ModifiedASMAnalyzer modifiedAsmAnalyzer;
+
+	public NestedMethodAnalyzer(UdfAnalyzer analyzer, String owner, MethodNode methodNode,
+			List<BasicValue> argumentValues, int remainingNesting, boolean topLevelMethod) {
+		this.analyzer = analyzer;
+		this.topLevelMethod = topLevelMethod;
+		this.owner = owner;
+		this.methodNode = methodNode;
+		this.argumentValues = argumentValues;
+		
+		this.remainingNesting = remainingNesting;
+		if (remainingNesting < 0) {
+			throw new CodeAnalyzerException("Maximum nesting level reached.");
+		}
+	}
+
+	public TaggedValue analyze() throws AnalyzerException {
+		modifiedAsmAnalyzer = new ModifiedASMAnalyzer(this);
+
+		// FOR DEBUGGING
+		//		final Printer printer = new Textifier();
+		//		final TraceMethodVisitor mp = new TraceMethodVisitor(printer);
+		//		System.out.println(methodNode.name + " " + methodNode.desc);
+		//		Iterator<AbstractInsnNode> it = methodNode.instructions.iterator();
+		//		while (it.hasNext()) {
+		//			it.next().accept(mp);
+		//			StringWriter sw = new StringWriter();
+		//			printer.print(new PrintWriter(sw));
+		//			printer.getText().clear();
+		//			System.out.println(sw.toString());
+		//		}
+		modifiedAsmAnalyzer.analyze(owner, methodNode);
+		return mergeReturnValues(returnValues);
+	}
+
+	@SuppressWarnings("unchecked")
+	private TaggedValue invokeNestedMethod(List<? extends BasicValue> values,
+			final MethodInsnNode methodInsn) throws AnalyzerException {
+		final Object[] mn = findMethodNode(methodInsn.owner, methodInsn.name, methodInsn.desc);
+		MethodNode methodNode = (MethodNode) mn[0];
+		// recursion
+		if (methodNode.name.equals(this.methodNode.name)
+				&& methodNode.desc.equals(this.methodNode.desc)) {
+			// TODO recursion are not supported perfectly yet
+			// recursion only work if the nested call follows at least one return statement
+			// return the values that are present so far
+			return mergeReturnValues(returnValues);
+		}
+		final NestedMethodAnalyzer nma = new NestedMethodAnalyzer(analyzer, (String) mn[1],
+				(MethodNode) mn[0],
+				(List<BasicValue>) values, remainingNesting -1,
+				topLevelMethod && isBridgeMethod());
+		return nma.analyze();
+	}
+
+	private boolean isBridgeMethod() {
+		return (methodNode.access & ACC_BRIDGE) == ACC_BRIDGE;
+	}
+
+	private boolean isGetRuntimeContext(MethodInsnNode methodInsnNode) {
+		return methodInsnNode.name.equals("getRuntimeContext")
+				&& findMethodNode(methodInsnNode.owner, methodInsnNode.name, methodInsnNode.desc)[1]
+				.equals("org/apache/flink/api/common/functions/AbstractRichFunction");
+	}
+
+	private Type checkForUnboxing(String name, String methodOwner) {
+		// for performance improvement
+		if (!methodOwner.startsWith("java/lang/")
+				|| !name.endsWith("Value")) {
+			return null;
+		}
+
+		final String actualType = methodOwner.substring(10);
+		final String convertedType = name.substring(0, name.length() - 5);
+
+		if (convertedType.equals("byte") && actualType.equals("Byte")) {
+			return Type.BYTE_TYPE;
+		}
+		else if (convertedType.equals("short") && actualType.equals("Short")) {
+			return Type.SHORT_TYPE;
+		}
+		else if (convertedType.equals("int") && actualType.equals("Integer")) {
+			return Type.INT_TYPE;
+		}
+		else if (convertedType.equals("long") && actualType.equals("Long")) {
+			return Type.LONG_TYPE;
+		}
+		else if (convertedType.equals("boolean") && actualType.equals("Boolean")) {
+			return Type.BOOLEAN_TYPE;
+		}
+		else if (convertedType.equals("char") && actualType.equals("Character")	) {
+			return Type.CHAR_TYPE;
+		}
+		else if (convertedType.equals("float") && actualType.equals("Float")) {
+			return Type.FLOAT_TYPE;
+		}
+		else if (convertedType.equals("double") && actualType.equals("Double")) {
+			return Type.DOUBLE_TYPE;
+		}
+		return null;
+	}
+
+	private Type checkForBoxing(String name, String desc, String methodOwner) {
+		// for performance improvement
+		if (!methodOwner.startsWith("java/lang/")
+				|| !name.equals("valueOf")) {
+			return null;
+		}
+
+		final String convertedType = methodOwner.substring(10);
+
+		if (convertedType.equals("Byte") && desc.equals("(B)Ljava/lang/Byte;")) {
+			return Type.BYTE_TYPE;
+		}
+		else if (convertedType.equals("Short") && desc.equals("(S)Ljava/lang/Short;")) {
+			return Type.SHORT_TYPE;
+		}
+		else if (convertedType.equals("Integer") && desc.equals("(I)Ljava/lang/Integer;")) {
+			return Type.INT_TYPE;
+		}
+		else if (convertedType.equals("Long") && desc.equals("(J)Ljava/lang/Long;")) {
+			return Type.LONG_TYPE;
+		}
+		else if (convertedType.equals("Boolean") && desc.equals("(Z)Ljava/lang/Boolean;")) {
+			return Type.BOOLEAN_TYPE;
+		}
+		else if (convertedType.equals("Character") && desc.equals("(C)Ljava/lang/Character;")) {
+			return Type.CHAR_TYPE;
+		}
+		else if (convertedType.equals("Float") && desc.equals("(F)Ljava/lang/Float;")) {
+			return Type.FLOAT_TYPE;
+		}
+		else if (convertedType.equals("Double") && desc.equals("(D)Ljava/lang/Double;")) {
+			return Type.DOUBLE_TYPE;
+		}
+		return null;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Interpreter
+	// --------------------------------------------------------------------------------------------
+
+	// variable that maps the method's arguments to ASM values
+	private int curArgIndex = -1;
+	private List<TaggedValue> returnValues = new ArrayList<TaggedValue>();
+
+	// see ModifiedASMFrame
+	ModifiedASMFrame currentFrame;
+	boolean rightMergePriority;
+
+	@Override
+	public BasicValue newValue(Type type) {
+		TaggedValue tv;
+		switch (analyzer.getState()) {
+			// skip "return"
+			case UdfAnalyzer.STATE_CAPTURE_RETURN:
+				analyzer.setState(UdfAnalyzer.STATE_CAPTURE_THIS);
+				curArgIndex++;
+				return super.newValue(type);
+			// tag "this"
+			case UdfAnalyzer.STATE_CAPTURE_THIS:
+				analyzer.setState(UdfAnalyzer.STATE_CAPTURE_INPUT1);
+				tv = new TaggedValue(type, Tag.THIS);
+				curArgIndex++;
+				return tv;
+			// tag input 1
+			case UdfAnalyzer.STATE_CAPTURE_INPUT1:
+				if (analyzer.isUdfBinary() || analyzer.isUdfReduceFunction()) {
+					analyzer.setState(UdfAnalyzer.STATE_CAPTURE_INPUT2);
+				} else if (analyzer.hasUdfCollector()) {
+					analyzer.setState(UdfAnalyzer.STATE_CAPTURE_COLLECTOR);
+				} else {
+					analyzer.setState(UdfAnalyzer.STATE_END_OF_CAPTURING);
+				}
+
+				// input is iterable
+				if (analyzer.hasUdfIterableInput()) {
+					tv = new TaggedValue(type, Tag.INPUT_1_ITERABLE);
+				}
+				else {
+					tv = analyzer.getInput1AsTaggedValue();
+				}
+				curArgIndex++;
+				return tv;
+			// tag input 2
+			case UdfAnalyzer.STATE_CAPTURE_INPUT2:
+				if (analyzer.hasUdfCollector()) {
+					analyzer.setState(UdfAnalyzer.STATE_CAPTURE_COLLECTOR);
+				} else {
+					analyzer.setState(UdfAnalyzer.STATE_END_OF_CAPTURING);
+				}
+
+				// input is iterable
+				if (analyzer.hasUdfIterableInput()) {
+					tv = new TaggedValue(type, Tag.INPUT_2_ITERABLE);
+				}
+				// special case: reduce function
+				else if (analyzer.isUdfReduceFunction()) {
+					tv = analyzer.getInput1AsTaggedValue();
+				}
+				else {
+					tv = analyzer.getInput2AsTaggedValue();
+				}
+				curArgIndex++;
+				return tv;
+			// tag collector
+			case UdfAnalyzer.STATE_CAPTURE_COLLECTOR:
+				analyzer.setState(UdfAnalyzer.STATE_END_OF_CAPTURING);
+				tv = new TaggedValue(type, Tag.COLLECTOR);
+				curArgIndex++;
+				return tv;
+			// if capturing has finished do not tag the arguments any more
+			// but copy values where necessary (for nested methods)
+			case UdfAnalyzer.STATE_END_OF_CAPTURING:
+			default:
+				// skip return type
+				if (curArgIndex < 0) {
+					curArgIndex++;
+				}
+				// return method's arguments
+				else if (argumentValues != null && curArgIndex < argumentValues.size()) {
+					return argumentValues.get(curArgIndex++);
+				}
+				return super.newValue(type);
+		}
+	}
+
+	@Override
+	public BasicValue newOperation(AbstractInsnNode insn) throws AnalyzerException {
+		switch (insn.getOpcode()) {
+			case ACONST_NULL:
+				return new TaggedValue(Type.getObjectType("null"), Tag.NULL);
+			case NEW:
+				analyzer.incrNewOperationCounters(topLevelMethod);
+				// make new objects a tagged value to have possibility to tag an 
+				// input container later
+				return new TaggedValue(Type.getObjectType(((TypeInsnNode) insn).desc));
+			// tag "int"-like constants
+			case BIPUSH:
+			case SIPUSH:
+				final IntInsnNode intInsn = (IntInsnNode) insn;
+				return new TaggedValue(intInsn.operand);
+			case LDC:
+				final Object cst = ((LdcInsnNode) insn).cst;
+				if (cst instanceof Integer) {
+					return new TaggedValue((Integer) cst);
+				}
+				return super.newOperation(insn);
+			case ICONST_M1:
+				return new TaggedValue(-1);
+			case ICONST_0:
+				return new TaggedValue(0);
+			case ICONST_1:
+				return new TaggedValue(1);
+			case ICONST_2:
+				return new TaggedValue(2);
+			case ICONST_3:
+				return new TaggedValue(3);
+			case ICONST_4:
+				return new TaggedValue(4);
+			case ICONST_5:
+				return new TaggedValue(5);
+			default:
+				return super.newOperation(insn);
+		}
+	}
+
+	@Override
+	public BasicValue copyOperation(AbstractInsnNode insn, BasicValue value) throws AnalyzerException {
+		switch (insn.getOpcode()) {
+			case ILOAD:
+				// int constants are only supported if they are used in the subsequent operation
+				// otherwise we can not guarantee that it isn't modified elsewhere (e.g. do-while loop)
+				if (isTagged(value) && tagged(value).isIntConstant()) {
+					tagged(value).makeRegular();
+				}
+				return super.copyOperation(insn, value);
+			// if input is stored "call by value", it will be copied
+			case ISTORE:
+			case LSTORE:
+			case FSTORE:
+			case DSTORE:
+			case ASTORE:
+			case DUP:
+			case DUP_X1:
+			case DUP_X2:
+			case DUP2:
+			case DUP2_X1:
+			case DUP2_X2:
+				if (isTagged(value) && tagged(value).isInput() && tagged(value).isCallByValue()) {
+					return tagged(value).copy();
+				}
+			default:
+				return super.copyOperation(insn, value);
+		}
+	}
+
+	@Override
+	public BasicValue unaryOperation(AbstractInsnNode insn, BasicValue value)
+			throws AnalyzerException {
+		switch (insn.getOpcode()) {
+			// modify jump instructions if we can assume that the hasNext operation will always
+			// be true at the first call
+			case IFEQ:
+				if (isTagged(value) && tagged(value).isIteratorTrueAssumption()) {
+					modifiedAsmAnalyzer.requestIFEQLoopModification();
+				}
+				return super.unaryOperation(insn, value);
+			case IFNE:
+				if (isTagged(value) && tagged(value).isIteratorTrueAssumption()) {
+					modifiedAsmAnalyzer.requestIFNELoopModification();
+				}
+				return super.unaryOperation(insn, value);
+
+			case CHECKCAST:
+				return value;
+			case PUTSTATIC:
+				analyzer.handlePutStatic();
+				return super.unaryOperation(insn, value);
+			case GETFIELD:
+				final FieldInsnNode field = (FieldInsnNode) insn;
+				// skip untagged values
+				if (!isTagged(value)) {
+					return super.unaryOperation(insn, value);
+				}
+				final TaggedValue taggedValue = (TaggedValue) value;
+
+				// inputs are atomic, a GETFIELD results in undefined state
+				if (taggedValue.isInput()) {
+					return super.unaryOperation(insn, value);
+				}
+				// access of input container field
+				// or access of a KNOWN UDF instance variable
+				else if (taggedValue.canContainFields()
+						&& taggedValue.containerContains(field.name)) {
+					final TaggedValue tv = taggedValue.getContainerMapping().get(field.name);
+					if (tv != null) {
+						return tv;
+					}
+				}
+				// access of a yet UNKNOWN UDF instance variable
+				else if (taggedValue.isThis()
+						&& !taggedValue.containerContains(field.name)) {
+					final TaggedValue tv = new TaggedValue(Type.getType(field.desc));
+					taggedValue.addContainerMapping(field.name, tv, currentFrame);
+					return tv;
+				}
+				// access of a yet unknown container, mark it as a container
+				else if (taggedValue.isRegular()) {
+					taggedValue.setTag(Tag.CONTAINER);
+					final TaggedValue tv = new TaggedValue(Type.getType(field.desc));
+					taggedValue.addContainerMapping(field.name, tv, currentFrame);
+					return tv;
+				}
+				return super.unaryOperation(insn, value);
+			case IINC:
+					// modification of a local variable or input
+					if (isTagged(value) && (tagged(value).isIntConstant() || tagged(value).isInput())) {
+						tagged(value).makeRegular();
+					}
+					return super.unaryOperation(insn, value);
+			default:
+				return super.unaryOperation(insn, value);
+		}
+	}
+
+	@Override
+	public BasicValue ternaryOperation(AbstractInsnNode insn, BasicValue value1, BasicValue value2,
+			BasicValue value3) throws AnalyzerException {
+		// if array is an input, make it regular since the input is modified
+		if (isTagged(value1) && tagged(value1).isInput()) {
+			tagged(value1).makeRegular();
+		}
+		return super.ternaryOperation(insn, value1, value2, value3);
+	}
+
+	@Override
+	public BasicValue binaryOperation(AbstractInsnNode insn, BasicValue value1,
+			BasicValue value2) throws AnalyzerException {
+		switch (insn.getOpcode()) {
+			case PUTFIELD: // put value2 into value1
+				// skip untagged values
+				if (!isTagged(value1)) {
+					return null;
+				}
+
+				final TaggedValue taggedValue = (TaggedValue) value1;
+				final FieldInsnNode field = (FieldInsnNode) insn;
+				final boolean value2HasInputDependency = hasImportantDependencies(value2);
+
+				// if value1 is not an input, make value1 a container and add value2 to it
+				// PUTFIELD on inputs is not allowed
+				if (!taggedValue.isInput() && value2HasInputDependency) {
+					if (!taggedValue.canContainFields()) {
+						taggedValue.setTag(Tag.CONTAINER);
+					}
+					taggedValue.addContainerMapping(field.name, tagged(value2), currentFrame);
+				}
+				// if value1 is filled with non-input, make it container and mark the field
+				// PUTFIELD on inputs is not allowed
+				else if (!taggedValue.isInput() && !value2HasInputDependency) {
+					if (!taggedValue.canContainFields()) {
+						taggedValue.setTag(Tag.CONTAINER);
+					}
+					taggedValue.addContainerMapping(field.name, null, currentFrame);
+				}
+				// PUTFIELD on input leads to input modification
+				// make input regular
+				else if (taggedValue.isInput()) {
+					taggedValue.makeRegular();
+				}
+				return null;
+			default:
+				return super.binaryOperation(insn, value1, value2);
+		}
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	public BasicValue naryOperation(AbstractInsnNode insn, List rawValues) throws AnalyzerException {
+		final List<BasicValue> values = (List<BasicValue>) rawValues;
+		boolean isStatic = false;
+		switch (insn.getOpcode()) {
+			case INVOKESTATIC:
+				isStatic = true;
+			case INVOKESPECIAL:
+			case INVOKEVIRTUAL:
+			case INVOKEINTERFACE:
+				final MethodInsnNode method = (MethodInsnNode) insn;
+				String methodOwner = method.owner;
+
+				// special case: in case that class is extending tuple we need to find the class
+				// that contains the actual implementation to determine the tuple size
+				if (method.name.equals("getField") || method.name.equals("setField")) {
+					try {
+						final String newMethodOwner = (String) findMethodNode(methodOwner, method.name, method.desc)[1];
+						if (newMethodOwner.startsWith("org/apache/flink/api/java/tuple/Tuple")) {
+							methodOwner = newMethodOwner;
+						}
+					}
+					catch (IllegalStateException e) {
+						// proceed with the known method owner
+					}
+				}
+
+				// special case: collect method of Collector
+				if (method.name.equals("collect")
+						&& methodOwner.equals("org/apache/flink/util/Collector")
+						&& isTagged(values.get(0))
+						&& tagged(values.get(0)).isCollector()) {
+					// check for invalid return value
+					if (isTagged(values.get(1)) && tagged(values.get(1)).isNull()) {
+						analyzer.handleNullReturn();
+					}
+					// valid return value with input dependencies
+					else if (hasImportantDependencies(values.get(1))){
+						// add a copy and a reference
+						// to capture the current state and future changes in alternative paths
+						analyzer.getCollectorValues().add(tagged(values.get(1)));
+						analyzer.getCollectorValues().add(tagged(values.get(1)).copy());
+					}
+					// valid return value without input dependencies
+					else {
+						analyzer.getCollectorValues().add(null);
+					}
+				}
+				// special case: iterator method of Iterable
+				else if (method.name.equals("iterator")
+						&& methodOwner.equals("java/lang/Iterable")
+						&& isTagged(values.get(0))
+						&& tagged(values.get(0)).isInputIterable()) {
+					return new TaggedValue(Type.getObjectType("java/util/Iterator"),
+							(tagged(values.get(0)).isInput1Iterable()) ? Tag.INPUT_1_ITERATOR : Tag.INPUT_2_ITERATOR);
+				}
+				// special case: hasNext method of Iterator
+				else if (method.name.equals("hasNext")
+						&& methodOwner.equals("java/util/Iterator")
+						&& isTagged(values.get(0))
+						&& tagged(values.get(0)).isInputIterator()
+						&& !analyzer.isUdfBinary()
+						&& !analyzer.isIteratorTrueAssumptionApplied()) {
+					return new TaggedValue(Type.BOOLEAN_TYPE, Tag.ITERATOR_TRUE_ASSUMPTION);
+				}
+				// special case: next method of Iterator
+				else if (method.name.equals("next")
+						&& methodOwner.equals("java/util/Iterator")
+						&& isTagged(values.get(0))
+						&& tagged(values.get(0)).isInputIterator()) {
+					// after this call it is not possible to assume "TRUE" of "hasNext()" again
+					analyzer.applyIteratorTrueAssumption();
+					if (tagged(values.get(0)).isInput1Iterator()) {
+						return analyzer.getInput1AsTaggedValue();
+					}
+					else {
+						return analyzer.getInput2AsTaggedValue();
+					}
+				}
+				// if the UDF class contains instance variables that contain input,
+				// we need to analyze also methods without input-dependent arguments
+				// special case: do not follow the getRuntimeContext method of RichFunctions
+				else if (!isStatic
+						&& isTagged(values.get(0))
+						&& tagged(values.get(0)).isThis()
+						&& hasImportantDependencies(values.get(0))
+						&& !isGetRuntimeContext(method)) {
+					TaggedValue tv = invokeNestedMethod(values, method);
+					if (tv != null) {
+						return tv;
+					}
+				}
+				// the arguments have input dependencies ("THIS" does not count to the arguments)
+				// we can assume that method has at least one argument
+				else if ((!isStatic && isTagged(values.get(0)) && tagged(values.get(0)).isThis() && hasImportantDependencies(values, true))
+						|| (!isStatic && (!isTagged(values.get(0)) || !tagged(values.get(0)).isThis()) && hasImportantDependencies(values, false))
+						|| (isStatic && hasImportantDependencies(values, false))) {
+					// special case: Java unboxing/boxing methods on input
+					Type newType;
+					if (isTagged(values.get(0))
+							&& tagged(values.get(0)).isInput()
+							&& (!isStatic && (newType = checkForUnboxing(method.name, methodOwner)) != null
+							|| (isStatic && (newType = checkForBoxing(method.name, method.desc, methodOwner))
+									!= null))) {
+						return tagged(values.get(0)).copy(newType);
+					}
+					// special case: setField method of TupleXX
+					else if (method.name.equals("setField")
+							&& methodOwner.startsWith("org/apache/flink/api/java/tuple/Tuple")
+							&& isTagged(values.get(0))
+							) {
+						final TaggedValue tuple =tagged(values.get(0));
+						tuple.setTag(Tag.CONTAINER);
+
+						// check if fieldPos is constant
+						// if not, we can not determine a state for the tuple
+						if (!isTagged(values.get(2)) || !tagged(values.get(2)).isIntConstant()) {
+							tuple.makeRegular();
+						}
+						else {
+							final int constant = tagged(values.get(2)).getIntConstant();
+
+							if (constant < 0 || Integer.valueOf(methodOwner.split("Tuple")[1]) <= constant ) {
+								analyzer.handleInvalidTupleAccess();
+							}
+
+							// if it is at least tagged, add it anyways
+							if (isTagged(values.get(1))) {
+								tuple.addContainerMapping("f" + constant, tagged(values.get(1)), currentFrame);
+							}
+							// mark the field as it has an undefined state
+							else {
+								tuple.addContainerMapping("f" + constant, null, currentFrame);
+							}
+						}
+					}
+					// special case: getField method of TupleXX
+					else if (method.name.equals("getField")
+							&& methodOwner.startsWith("org/apache/flink/api/java/tuple/Tuple")) {
+						final TaggedValue tuple = tagged(values.get(0)); // we can assume that 0 is an input dependent tuple
+						// constant field index
+						if (isTagged(values.get(1)) // constant
+								&& tagged(values.get(1)).isIntConstant()) {
+							final int constant = tagged(values.get(1)).getIntConstant();
+
+							if (constant < 0 || Integer.valueOf(methodOwner.split("Tuple")[1]) <= constant) {
+								analyzer.handleInvalidTupleAccess();
+							}
+
+							if (tuple.containerContains("f" + constant)) {
+								final TaggedValue tupleField = tuple.getContainerMapping().get("f" + constant);
+								if (tupleField != null) {
+									return tupleField;
+								}
+							}
+						}
+						// unknown field index
+						else {
+							// we need to make the tuple regular as we cannot track modifications of fields
+							tuple.makeRegular();
+							return new TaggedValue(Type.getObjectType("java/lang/Object"));
+						}
+					}
+					// nested method invocation
+					else {
+						TaggedValue tv = invokeNestedMethod(values, method);
+						if (tv != null) {
+							return tv;
+						}
+					}
+				}
+				return super.naryOperation(insn, values);
+			default:
+				// TODO support for INVOKEDYNAMIC instructions
+				return super.naryOperation(insn, values);
+		}
+	}
+
+	@Override
+	public void returnOperation(AbstractInsnNode insn, BasicValue value,
+			BasicValue expected) throws AnalyzerException {
+		// return of a null value in the top level UDF method
+		if (isTagged(value) && tagged(value).isNull() && topLevelMethod) {
+			analyzer.handleNullReturn();
+		}
+		else if (hasImportantDependencies(value)){
+			// add a copy and a reference
+			// to capture the current state and future changes in alternative paths
+			returnValues.add(tagged(value));
+			returnValues.add(tagged(value).copy());
+		}
+		else {
+			returnValues.add(null);
+		}
+	}
+
+	@Override
+	public BasicValue merge(BasicValue v, BasicValue w) {
+		// values are not equal
+		// BasicValue's equals method is too general
+		if ((!isTagged(v) && !w.equals(v)) || (isTagged(v) && !v.equals(w))) {
+			// w is a BasicValue
+			if (isTagged(v) && !isTagged(w)) {
+				return new TaggedValue(w.getType());
+			}
+			// v is a BasicValue or uninteresting and w is a interesting TaggedValue
+			else if ((!isTagged(v) || tagged(v).canNotContainInput())
+					&& isTagged(w) && tagged(w).canContainInput()) {
+				final TaggedValue taggedW = tagged(w);
+				if (hasImportantDependencies(taggedW)
+						&& rightMergePriority) {
+						 // w has a merge priority, its grouped inputs will be returned
+					final TaggedValue returnValue = removeUngroupedInputs(taggedW.copy());
+					if (returnValue != null) {
+						return returnValue;
+					}
+					else {
+						return new TaggedValue(v.getType());
+					}
+				}
+				return new TaggedValue(v.getType());
+			}
+			// v is a BasicValue and w is a uninteresting TaggedValue
+			else if (!isTagged(v) && isTagged(w) && tagged(w).canNotContainInput()) {
+				return v;
+			}
+			// merge v and w (both TaggedValues), v is interesting
+			else if (isTagged(v) && isTagged(w) && tagged(v).canContainInput()) {
+				final List<TaggedValue> list = Arrays.asList(tagged(v), tagged(w));
+				final TaggedValue returnValue = mergeReturnValues(list);
+				if (returnValue != null) {
+					return returnValue;
+				}
+			}
+			// v is a TaggedValue and uninteresting
+			else if (isTagged(v) && tagged(v).canNotContainInput()) {
+				return v;
+			}
+			// v and w are BasicValues and not equal
+			return BasicValue.UNINITIALIZED_VALUE;
+		}
+		// v and w are equal, return one of them
+		return v;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
new file mode 100644
index 0000000..c54ecf3
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.objectweb.asm.Type;
+import org.objectweb.asm.tree.analysis.BasicValue;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Extension of ASM's BasicValue that allows to assign "tags"
+ * to values and add additional information depending on the tag to the Value.
+ */
+public class TaggedValue extends BasicValue {
+
+	public static enum Tag {
+		REGULAR, // regular object with no special meaning
+		THIS, // a special container which is the instance of the UDF
+		INPUT, // atomic input field
+		COLLECTOR, // collector of UDF
+		CONTAINER, // container that contains fields
+		INT_CONSTANT, // int constant
+		INPUT_1_ITERABLE, INPUT_2_ITERABLE, INPUT_1_ITERATOR, INPUT_2_ITERATOR, // input iterators
+		ITERATOR_TRUE_ASSUMPTION, // boolean value that is "true" at least once
+		NULL // null
+	};
+
+	public static enum Input {
+		INPUT_1(0), INPUT_2(1);
+
+		private int id;
+
+		private Input(int id) {
+			this.id = id;
+		}
+
+		public int getId() {
+			return id;
+		}
+	};
+
+	private Tag tag;
+	// only inputs can set this to true
+	private boolean callByValue = false;
+
+	// for inputs
+	private Input input;
+	private String flatFieldExpr; // empty string for non-composite types
+	private boolean grouped;
+
+	// for input containers & this
+	// key=field / value=input or container
+	// undefined state => value=null
+	private Map<String, TaggedValue> containerMapping;
+	private Map<String, ModifiedASMFrame> containerFrameMapping;
+
+	// for int constants
+	private int intConstant;
+
+	public TaggedValue(Type type) {
+		this(type, Tag.REGULAR);
+	}
+
+	public TaggedValue(Type type, Tag tag) {
+		super(type);
+		this.tag = tag;
+	}
+
+	public TaggedValue(Type type, Input input, String flatFieldExpr, boolean grouped, boolean callByValue) {
+		super(type);
+		tag = Tag.INPUT;
+		this.input = input;
+		this.flatFieldExpr = flatFieldExpr;
+		this.grouped = grouped;
+		this.callByValue = callByValue;
+	}
+
+	public TaggedValue(Type type, Map<String, TaggedValue> containerMapping) {
+		super(type);
+		tag = Tag.CONTAINER;
+		this.containerMapping = containerMapping;
+	}
+
+	public TaggedValue(int constant) {
+		super(Type.INT_TYPE);
+		tag = Tag.INT_CONSTANT;
+		this.intConstant = constant;
+	}
+
+	public boolean isInput() {
+		return tag == Tag.INPUT;
+	}
+
+	public boolean isThis() {
+		return tag == Tag.THIS;
+	}
+
+	public boolean isContainer() {
+		return tag == Tag.CONTAINER;
+	}
+
+	public boolean isRegular() {
+		return tag == Tag.REGULAR;
+	}
+
+	public boolean isIntConstant() {
+		return tag == Tag.INT_CONSTANT;
+	}
+
+	public boolean isCollector() {
+		return tag == Tag.COLLECTOR;
+	}
+
+	public boolean isInputIterable() {
+		return tag == Tag.INPUT_1_ITERABLE || tag == Tag.INPUT_2_ITERABLE;
+	}
+
+	public boolean isInputIterator() {
+		return tag == Tag.INPUT_1_ITERATOR || tag == Tag.INPUT_2_ITERATOR;
+	}
+
+	public boolean isInput1Iterable() {
+		return tag == Tag.INPUT_1_ITERABLE;
+	}
+
+	public boolean isInput1Iterator() {
+		return tag == Tag.INPUT_1_ITERATOR;
+	}
+
+	public boolean isIteratorTrueAssumption() {
+		return tag == Tag.ITERATOR_TRUE_ASSUMPTION;
+	}
+
+	public boolean isNull() {
+		return tag == Tag.NULL;
+	}
+
+	public boolean canNotContainInput() {
+		return tag != Tag.INPUT && tag != Tag.CONTAINER && tag != Tag.THIS;
+	}
+
+	public boolean canContainInput() {
+		return tag == Tag.INPUT || tag == Tag.CONTAINER || tag == Tag.THIS;
+	}
+
+	public boolean canContainFields() {
+		return tag == Tag.CONTAINER || tag == Tag.THIS;
+	}
+
+	public boolean isCallByValue() {
+		return callByValue;
+	}
+
+	public Tag getTag() {
+		return tag;
+	}
+
+	public void setTag(Tag tag) {
+		this.tag = tag;
+		if (tag == Tag.CONTAINER || tag == Tag.THIS) {
+			input = null;
+			flatFieldExpr = null;
+		}
+		else if (tag == Tag.INPUT) {
+			containerMapping = null;
+		}
+		else {
+			input = null;
+			containerMapping = null;
+			flatFieldExpr = null;
+		}
+	}
+
+	public String toForwardedFieldsExpression(Input input) {
+		// input not relevant
+		if (isInput() && this.input != input) {
+			return null;
+		}
+		// equivalent to semantic annotation "*" for non-composite types
+		else if (isInput() && flatFieldExpr.length() == 0) {
+			return "*";
+		}
+		// equivalent to "f0.f0->*"
+		else if (isInput()) {
+			return flatFieldExpr + "->*";
+		}
+		// equivalent to "f3;f0.f0->f0.f1;f1->f2;..."
+		else if (canContainFields() && containerMapping != null) {
+			final StringBuilder sb = new StringBuilder();
+			traverseContainer(input, containerMapping, sb, "");
+			final String returnValue = sb.toString();
+			if (returnValue != null && returnValue.length() > 0) {
+				return returnValue;
+			}
+		}
+		return null;
+	}
+
+	private void traverseContainer(Input input, Map<String, TaggedValue> containerMapping, StringBuilder sb,
+			String prefix) {
+		for (Map.Entry<String,TaggedValue> entry : containerMapping.entrySet()) {
+			// skip undefined states
+			if (entry.getValue() == null) {
+				continue;
+			}
+			// input
+			else if (entry.getValue().isInput() && entry.getValue().input == input) {
+				final String flatFieldExpr = entry.getValue().getFlatFieldExpr();
+				if (flatFieldExpr.length() == 0) {
+					sb.append("*");
+				}
+				else {
+					sb.append(flatFieldExpr);
+				}
+				sb.append("->");
+				if (prefix.length() > 0) {
+					sb.append(prefix);
+					sb.append('.');
+				}
+				sb.append(entry.getKey());
+				sb.append(';');
+			}
+			// input containers
+			else if (entry.getValue().canContainFields()) {
+				traverseContainer(input, entry.getValue().containerMapping, sb,
+						((prefix.length() > 0)? prefix + "." : "") + entry.getKey());
+			}
+		}
+	}
+
+	@Override
+	public boolean equals(Object value) {
+		if (!(value instanceof TaggedValue) || !super.equals(value)) {
+			return false;
+		}
+		final TaggedValue other = (TaggedValue) value;
+		if (other.tag != tag) {
+			return false;
+		}
+
+		if (isInput()) {
+			return input == other.input && flatFieldExpr.equals(other.flatFieldExpr)
+					&& grouped == other.grouped && callByValue == other.callByValue;
+		}
+		else if (canContainFields()) {
+			if ((containerMapping == null && other.containerMapping != null)
+					|| (containerMapping != null && other.containerMapping == null)) {
+				return false;
+			}
+			if (containerMapping == null) {
+				return true;
+			}
+			return containerMapping.equals(other.containerMapping);
+		}
+		return tag == other.tag;
+	}
+
+	@Override
+	public String toString() {
+		if (isInput()) {
+			return "TaggedValue(" + tag + ":" + flatFieldExpr + ")";
+		}
+		else if (canContainFields()) {
+			return "TaggedValue(" + tag + ":" + containerMapping + ")";
+		}
+		else if (isIntConstant()) {
+			return "TaggedValue(" + tag + ":" + intConstant + ")";
+		}
+		return "TaggedValue(" + tag + ")";
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Input
+	// --------------------------------------------------------------------------------------------
+
+	public Input getInput() {
+		return input;
+	}
+
+	public String getFlatFieldExpr() {
+		return flatFieldExpr;
+	}
+
+	public boolean isGrouped() {
+		return grouped;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Container & This
+	// --------------------------------------------------------------------------------------------
+
+	public Map<String, TaggedValue> getContainerMapping() {
+		return containerMapping;
+	}
+
+	public boolean containerContains(String field) {
+		if (containerMapping == null) {
+			return false;
+		}
+		return containerMapping.containsKey(field);
+	}
+
+	public boolean containerHasReferences() {
+		if (containerMapping == null) {
+			return false;
+		}
+		for (TaggedValue value : containerMapping.values()) {
+			if (value == null || !value.isCallByValue()) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	public void addContainerMapping(String field, TaggedValue mapping, ModifiedASMFrame frame) {
+		if (containerMapping == null) {
+			containerMapping = new HashMap<String, TaggedValue>(4);
+		}
+		if (containerFrameMapping == null) {
+			containerFrameMapping = new HashMap<String, ModifiedASMFrame>(4);
+		}
+		if (containerMapping.containsKey(field)
+				&& containerMapping.get(field) != null
+				&& frame == containerFrameMapping.get(field)) {
+			containerMapping.put(field, null);
+			containerFrameMapping.remove(field);
+		}
+		else {
+			containerMapping.put(field, mapping);
+			containerFrameMapping.put(field, frame);
+		}
+	}
+
+	public void clearContainerMappingMarkedFields() {
+		if (containerMapping != null) {
+			final Iterator<Entry<String, TaggedValue>> it = containerMapping.entrySet().iterator();
+			while (it.hasNext()) {
+				final Entry<String, TaggedValue> entry = it.next();
+				if (entry.getValue() == null) {
+					it.remove();
+				}
+			}
+		}
+	}
+
+	public void makeRegular() {
+		if (canContainFields() && containerMapping != null) {
+			for (TaggedValue value : containerMapping.values()) {
+				value.makeRegular();
+			}
+		}
+		setTag(Tag.REGULAR);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// IntConstant
+	// --------------------------------------------------------------------------------------------
+
+	public int getIntConstant() {
+		return intConstant;
+	}
+
+	public TaggedValue copy() {
+		return copy(getType());
+	}
+
+	public TaggedValue copy(Type type) {
+		final TaggedValue newValue = new TaggedValue(type);
+		newValue.tag = this.tag;
+		if (isInput()) {
+			newValue.input = this.input;
+			newValue.flatFieldExpr = this.flatFieldExpr;
+			newValue.grouped = this.grouped;
+			newValue.callByValue = this.callByValue;
+		}
+		else if (canContainFields()) {
+			final HashMap<String, TaggedValue> containerMapping = new HashMap<String, TaggedValue>(this.containerMapping.size());
+			final HashMap<String, ModifiedASMFrame> containerFrameMapping;
+			if (this.containerFrameMapping != null) {
+				containerFrameMapping = new HashMap<String, ModifiedASMFrame>(this.containerFrameMapping.size());
+			} else {
+				containerFrameMapping = null;
+			}
+			for (Entry<String, TaggedValue> entry : this.containerMapping.entrySet()) {
+				if (entry.getValue() != null) {
+					containerMapping.put(entry.getKey(), entry.getValue().copy());
+					if (containerFrameMapping != null) {
+						containerFrameMapping.put(entry.getKey(), this.containerFrameMapping.get(entry.getKey()));
+					}
+				} else {
+					containerMapping.put(entry.getKey(), null);
+				}
+			}
+			newValue.containerMapping = containerMapping;
+			newValue.containerFrameMapping = containerFrameMapping;
+		}
+		else if (isIntConstant()) {
+				newValue.intConstant = this.intConstant;
+		}
+		return newValue;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
new file mode 100644
index 0000000..1f4b44a
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.java.sca.TaggedValue.Input;
+import org.objectweb.asm.Type;
+import org.objectweb.asm.tree.MethodNode;
+import org.slf4j.Logger;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.convertTypeInfoToTaggedValue;
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.findMethodNode;
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.mergeReturnValues;
+import static org.apache.flink.api.java.sca.UdfAnalyzerUtils.removeUngroupedInputsFromContainer;
+
+/**
+ * Implements a Static Code Analyzer (SCA) that uses the ASM framework
+ * for interpreting Java bytecode of Flink UDFs. The analyzer is build on
+ * top of ASM's BasicInterpreter. Instead of ASM's BasicValues, it introduces
+ * TaggedValues which extend BasicValue and allows for appending interesting
+ * information to values. Interesting values such as inputs, collectors, or
+ * constants are tagged such that a tracking of atomic input fields through the
+ * entire UDF (until the function returns or calls collect()) is possible.
+ *
+ * The implementation is as conservative as possible meaning that for cases
+ * or bytecode instructions that haven't been considered the analyzer
+ * will fallback to the ASM library (which removes TaggedValues).
+ */
+public class UdfAnalyzer {
+	// necessary to prevent endless loops and stack overflows
+	private static final int MAX_NESTING = 20;
+
+	// general information about the UDF that is available before analysis takes place
+	private final Method baseClassMethod;
+	private final boolean hasCollector;
+	private final boolean isBinary;
+	private final boolean isIterableInput;
+	private final boolean isReduceFunction;
+	private final boolean isFilterFunction;
+	private final Class<?> udfClass;
+	private final String externalUdfName;
+	private final String internalUdfClassName;
+	private final TypeInformation<?> in1Type;
+	private final TypeInformation<?> in2Type;
+	private final TypeInformation<?> outType;
+	private final Keys<?> keys1;
+	private final Keys<?> keys2;
+
+	// flag if code errors should throw an CodeErrorException
+	private final boolean throwErrorExceptions;
+
+	// list of all values added with a "collect()" call
+	private final List<TaggedValue> collectorValues;
+
+	// list of all hints that can be returned after analysis
+	private final List<String> hints;
+	private boolean warning = false;
+
+	// stages for capturing and tagging the initial BasicValues
+	private int state = STATE_CAPTURE_RETURN;
+
+	static final int STATE_CAPTURE_RETURN = 0;
+	static final int STATE_CAPTURE_THIS = 1;
+	static final int STATE_CAPTURE_INPUT1 = 2;
+	static final int STATE_CAPTURE_INPUT2 = 3;
+	static final int STATE_CAPTURE_COLLECTOR = 4;
+	static final int STATE_END_OF_CAPTURING = 5;
+	static final int STATE_END_OF_ANALYZING = 6;
+
+	// flag that indicates if the "hasNext()" call of an input iterator has returned "TRUE"
+	// and can now return "FALSE" if assumption has already been used
+	private boolean iteratorTrueAssumptionApplied;
+
+	// merged return value of all return statements in the UDF
+	private TaggedValue returnValue;
+
+	// statistics for object creation hinting
+	private int newOperationCounterOverall;
+	private int newOperationCounterTopLevel;
+
+	// stored FilterFunction input for later modification checking
+	private TaggedValue filterInputCopy;
+	private TaggedValue filterInputRef;
+
+	public UdfAnalyzer(Class<?> baseClass, Class<?> udfClass, String externalUdfName,
+			TypeInformation<?> in1Type, TypeInformation<?> in2Type,
+			TypeInformation<?> outType, Keys<?> keys1, Keys<?> keys2,
+			boolean throwErrorExceptions) {
+
+		baseClassMethod = baseClass.getDeclaredMethods()[0];
+		this.udfClass = udfClass;
+		this.externalUdfName = externalUdfName;
+		this.internalUdfClassName = Type.getInternalName(udfClass);
+		this.in1Type = in1Type;
+		this.in2Type = in2Type;
+		this.outType = outType;
+		this.keys1 = keys1;
+		this.keys2 = keys2;
+		this.throwErrorExceptions = throwErrorExceptions;
+
+		if (baseClass == CoGroupFunction.class) {
+			hasCollector = true;
+			isBinary = true;
+			isIterableInput = true;
+			isReduceFunction = false;
+			isFilterFunction = false;
+			iteratorTrueAssumptionApplied = true;
+		}
+		else if (baseClass == CrossFunction.class) {
+			hasCollector = false;
+			isBinary = true;
+			isIterableInput = false;
+			isReduceFunction = false;
+			isFilterFunction = false;
+			iteratorTrueAssumptionApplied = true;
+		}
+		else if (baseClass == FlatJoinFunction.class) {
+			hasCollector = true;
+			isBinary = true;
+			isIterableInput = false;
+			isReduceFunction = false;
+			isFilterFunction = false;
+			iteratorTrueAssumptionApplied = true;
+		}
+		else if (baseClass == FlatMapFunction.class) {
+			hasCollector = true;
+			isBinary = false;
+			isIterableInput = false;
+			isReduceFunction = false;
+			isFilterFunction = false;
+			iteratorTrueAssumptionApplied = true;
+		}
+		else if (baseClass == GroupReduceFunction.class) {
+			hasCollector = true;
+			isBinary = false;
+			isIterableInput = true;
+			isReduceFunction = false;
+			isFilterFunction = false;
+			iteratorTrueAssumptionApplied = false;
+		}
+		else if (baseClass == JoinFunction.class) {
+			hasCollector = false;
+			isBinary = true;
+			isIterableInput = false;
+			isReduceFunction = false;
+			isFilterFunction = false;
+			iteratorTrueAssumptionApplied = true;
+		}
+		else if (baseClass == MapFunction.class) {
+			hasCollector = false;
+			isBinary = false;
+			isIterableInput = false;
+			isReduceFunction = false;
+			isFilterFunction = false;
+			iteratorTrueAssumptionApplied = true;
+		}
+		else if (baseClass == ReduceFunction.class) {
+			hasCollector = false;
+			isBinary = false;
+			isIterableInput = false;
+			isReduceFunction = true;
+			isFilterFunction = false;
+			iteratorTrueAssumptionApplied = true;
+		}
+		else if (baseClass == FilterFunction.class) {
+			hasCollector = false;
+			isBinary = false;
+			isIterableInput = false;
+			isReduceFunction = false;
+			isFilterFunction = true;
+			iteratorTrueAssumptionApplied = true;
+		}
+		// TODO MapPartitionFunction, GroupCombineFunction and CombineFunction not implemented yet
+		else {
+			throw new UnsupportedOperationException("Unsupported operator.");
+		}
+		if (hasCollector) {
+			collectorValues = new ArrayList<TaggedValue>();
+		}
+		else {
+			collectorValues = null;
+		}
+		hints = new ArrayList<String>();
+	}
+
+	public int getState() {
+		return state;
+	}
+
+	public void setState(int state) {
+		this.state = state;
+	}
+
+	public boolean isUdfBinary() {
+		return isBinary;
+	}
+
+	public boolean isIteratorTrueAssumptionApplied() {
+		return iteratorTrueAssumptionApplied;
+	}
+
+	public void applyIteratorTrueAssumption() {
+		iteratorTrueAssumptionApplied = true;
+	}
+
+	public void incrNewOperationCounters(boolean topLevel) {
+		newOperationCounterOverall++;
+		if (topLevel) {
+			newOperationCounterTopLevel++;
+		}
+	}
+
+	public boolean hasUdfCollector() {
+		return hasCollector;
+	}
+
+	public boolean hasUdfIterableInput() {
+		return isIterableInput;
+	}
+
+	public boolean isUdfReduceFunction() {
+		return isReduceFunction;
+	}
+
+	public String getInternalUdfClassName() {
+		return internalUdfClassName;
+	}
+
+	public List<TaggedValue> getCollectorValues() {
+		return collectorValues;
+	}
+
+	public boolean analyze() throws CodeAnalyzerException {
+		if (state == STATE_END_OF_ANALYZING) {
+			throw new IllegalStateException("Analyzing is already done.");
+		}
+		
+		boolean discardReturnValues = false;
+
+		if (isIterableInput) {
+			if (keys1 == null || (keys2 == null && isBinary)) {
+				throw new IllegalArgumentException("This type of function requires key information for analysis.");
+			}
+			else if (!(keys1 instanceof ExpressionKeys) || (!(keys2 instanceof ExpressionKeys) && isBinary)) {
+				// TODO currently only ExpressionKeys are supported as keys
+				discardReturnValues = true;
+			}
+		}
+
+		try {
+			final Object[] mn = findMethodNode(internalUdfClassName, baseClassMethod);
+			final NestedMethodAnalyzer nma = new NestedMethodAnalyzer(this, (String) mn[1],
+					(MethodNode) mn[0], null, MAX_NESTING, true);
+			final TaggedValue result = nma.analyze();
+			setState(STATE_END_OF_ANALYZING);
+
+			// special case: FilterFunction
+			if (isFilterFunction) {
+				discardReturnValues = true;
+				// check for input modification
+				if (!filterInputCopy.equals(filterInputRef)) {
+					addHintOrThrowException("Function modifies the input. This can lead to unexpected behaviour during runtime.");
+				}
+			}
+
+			if (!discardReturnValues) {
+				// merge return values of a collector
+				if (hasCollector) {
+					returnValue = mergeReturnValues(collectorValues);
+				}
+				else {
+					returnValue = result;
+				}
+				// remove ungrouped inputs from result if UDF has iterators
+				// or is a reduce function
+				if ((isIterableInput || isReduceFunction) && returnValue != null) {
+					if (returnValue.canContainFields()) {
+						removeUngroupedInputsFromContainer(returnValue);
+					}
+					else if (returnValue.isInput() && !returnValue.isGrouped()) {
+						returnValue = null;
+					}
+				}
+			}
+			// any return value is invalid
+			else {
+				returnValue = null;
+			}
+		}
+		catch (Exception e) {
+			Throwable cause = e.getCause();
+			while (cause != null && !(cause instanceof CodeErrorException)) {
+				cause = cause.getCause();
+			}
+			if ((cause != null && cause instanceof CodeErrorException) || e instanceof CodeErrorException) {
+				throw new CodeErrorException("Function code contains obvious errors. " +
+						"If you think the code analysis is wrong at this point you can " +
+						"disable the entire code analyzer in ExecutionConfig or add" +
+						" @SkipCodeAnalysis to your function to disable the analysis.",
+						(cause != null)? cause : e);
+			}
+			throw new CodeAnalyzerException("Exception occurred during code analysis.", e);
+		}
+		return true;
+	}
+
+	public SemanticProperties getSemanticProperties() {
+		final SemanticProperties sp;
+		if (isBinary) {
+			sp = new DualInputSemanticProperties();
+			if (returnValue != null) {
+				String[] ff1Array = null;
+				final String ff1 = returnValue.toForwardedFieldsExpression(Input.INPUT_1);
+				if (ff1 !=null && ff1.length() > 0) {
+					ff1Array = new String[] { ff1 };
+				}
+				String[] ff2Array = null;
+				final String ff2 = returnValue.toForwardedFieldsExpression(Input.INPUT_2);
+				if (ff2 !=null && ff2.length() > 0) {
+					ff2Array = new String[] { ff2 };
+				}
+				SemanticPropUtil.getSemanticPropsDualFromString((DualInputSemanticProperties) sp,
+						ff1Array, ff2Array, null, null, null, null, in1Type, in2Type, outType, true);
+			}
+		}
+		else {
+			sp = new SingleInputSemanticProperties();
+			if (returnValue != null) {
+				String[] ffArray = null;
+				final String ff = returnValue.toForwardedFieldsExpression(Input.INPUT_1);
+				if (ff !=null && ff.length() > 0) {
+					ffArray = new String[] { ff };
+				}
+				SemanticPropUtil.getSemanticPropsSingleFromString((SingleInputSemanticProperties) sp,
+						ffArray, null, null, in1Type, outType, true);
+			}
+		}
+		return sp;
+	}
+
+	public void addSemanticPropertiesHints() {
+		boolean added = false;
+		if (returnValue != null) {
+			if (isBinary) {
+				final String ff1 = returnValue.toForwardedFieldsExpression(Input.INPUT_1);
+				if (ff1 != null && ff1.length() > 0) {
+					added = true;
+					hints.add("Possible annotation: "
+							+ "@ForwardedFieldsFirst(\"" + ff1 + "\")");
+				}
+				final String ff2 = returnValue.toForwardedFieldsExpression(Input.INPUT_2);
+				if (ff2 != null && ff2.length() > 0) {
+					added = true;
+					hints.add("Possible annotation: "
+							+ "@ForwardedFieldsSecond(\"" + ff2 + "\")");
+				}
+			} else {
+				final String ff = returnValue.toForwardedFieldsExpression(Input.INPUT_1);
+				if (ff != null && ff.length() > 0) {
+					added = true;
+					hints.add("Possible annotation: "
+							+ "@ForwardedFields(\"" + ff + "\")");
+				}
+			}
+		}
+		if (!added) {
+			hints.add("Possible annotations: none.");
+		}
+	}
+
+	public void printToLogger(Logger log) {
+		StringBuilder sb = new StringBuilder();
+		sb.append("Code analysis result for '" + externalUdfName + " (" + udfClass.getName() + ")':");
+		sb.append("\nNumber of object creations: "
+				+ newOperationCounterTopLevel + " in method / " + newOperationCounterOverall + " transitively");
+
+		for (String hint : hints) {
+			sb.append('\n');
+			sb.append(hint);
+		}
+
+		if (warning) {
+			log.warn(sb.toString());
+		}
+		else {
+			log.info(sb.toString());
+		}
+	}
+
+	public TaggedValue getInput1AsTaggedValue() {
+		final int[] groupedKeys;
+		if (keys1 != null) {
+			groupedKeys = keys1.computeLogicalKeyPositions();
+		}
+		else {
+			groupedKeys = null;
+		}
+		final TaggedValue input1 = convertTypeInfoToTaggedValue(Input.INPUT_1, in1Type, "", null, groupedKeys);
+		// store the input and a copy of it to check for modification afterwards
+		if (isFilterFunction) {
+			filterInputRef = input1;
+			filterInputCopy = input1.copy();
+		}
+		return input1;
+	}
+
+	public TaggedValue getInput2AsTaggedValue() {
+		final int[] groupedKeys;
+		if (keys2 != null) {
+			groupedKeys = keys2.computeLogicalKeyPositions();
+		}
+		else {
+			groupedKeys = null;
+		}
+		return convertTypeInfoToTaggedValue(Input.INPUT_2, in2Type, "", null, groupedKeys);
+	}
+
+	private void addHintOrThrowException(String msg) {
+		if (throwErrorExceptions) {
+			throw new CodeErrorException(externalUdfName + ": " + msg);
+		}
+		else {
+			warning = true;
+			hints.add(msg);
+		}
+	}
+
+	public void handleNullReturn() {
+		addHintOrThrowException("Function returns 'null' values. This can lead to errors during runtime.");
+	}
+
+	public void handlePutStatic() {
+		addHintOrThrowException("Function modifies static fields. This can lead to unexpected behaviour during runtime.");
+	}
+
+	public void handleInvalidTupleAccess() {
+		addHintOrThrowException("Function contains tuple accesses with invalid indexes. This can lead to errors during runtime.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
new file mode 100644
index 0000000..df1e421
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.objectweb.asm.ClassReader;
+import org.objectweb.asm.Type;
+import org.objectweb.asm.tree.ClassNode;
+import org.objectweb.asm.tree.MethodNode;
+import org.objectweb.asm.tree.analysis.BasicValue;
+import org.objectweb.asm.tree.analysis.Value;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public final class UdfAnalyzerUtils {
+
+	public static TaggedValue convertTypeInfoToTaggedValue(TaggedValue.Input input, TypeInformation<?> typeInfo,
+			String flatFieldExpr, List<CompositeType.FlatFieldDescriptor> flatFieldDesc, int[] groupedKeys) {
+		// java tuples & scala tuples
+		if (typeInfo instanceof TupleTypeInfoBase) {
+			final TupleTypeInfoBase<?> tupleTypeInfo = (TupleTypeInfoBase<?>) typeInfo;
+			HashMap<String, TaggedValue> containerMapping = new HashMap<String, TaggedValue>();
+			for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
+				final String fieldName;
+				// java
+				if (typeInfo instanceof TupleTypeInfo) {
+					fieldName = "f" + i;
+				}
+				// scala
+				else {
+					fieldName = "_" + (i+1);
+				}
+				containerMapping.put(fieldName,
+						convertTypeInfoToTaggedValue(input,
+								tupleTypeInfo.getTypeAt(i),
+								(flatFieldExpr.length() > 0 ? flatFieldExpr + "." : "") + fieldName,
+								tupleTypeInfo.getFlatFields(fieldName),
+								groupedKeys));
+			}
+			return new TaggedValue(Type.getObjectType("java/lang/Object"), containerMapping);
+		}
+		// pojos
+		else if (typeInfo instanceof PojoTypeInfo) {
+			final PojoTypeInfo<?> pojoTypeInfo = (PojoTypeInfo<?>) typeInfo;
+			HashMap<String, TaggedValue> containerMapping = new HashMap<String, TaggedValue>();
+			for (int i = 0; i < pojoTypeInfo.getArity(); i++) {
+				final String fieldName = pojoTypeInfo.getPojoFieldAt(i).field.getName();
+				containerMapping.put(fieldName,
+						convertTypeInfoToTaggedValue(input,
+								pojoTypeInfo.getTypeAt(i),
+								(flatFieldExpr.length() > 0 ? flatFieldExpr + "." : "") + fieldName,
+								pojoTypeInfo.getFlatFields(fieldName),
+								groupedKeys));
+			}
+			return new TaggedValue(Type.getObjectType("java/lang/Object"), containerMapping);
+		}
+		// atomic
+		boolean groupedField = false;
+		if (groupedKeys != null && flatFieldDesc != null) {
+			int flatFieldPos = flatFieldDesc.get(0).getPosition();
+			for (int groupedKey : groupedKeys) {
+				if (groupedKey == flatFieldPos) {
+					groupedField = true;
+					break;
+				}
+			}
+		}
+
+		return new TaggedValue(Type.getType(typeInfo.getTypeClass()), input, flatFieldExpr, groupedField,
+				typeInfo.isBasicType() && typeInfo != BasicTypeInfo.DATE_TYPE_INFO);
+	}
+
+	/**
+	 * @return array that contains the method node and the name of the class where
+	 * the method node has been found
+	 */
+	public static Object[] findMethodNode(String internalClassName, Method method) {
+		return findMethodNode(internalClassName, method.getName(), Type.getMethodDescriptor(method));
+	}
+
+	/**
+	 * @return array that contains the method node and the name of the class where
+	 * the method node has been found
+	 */
+	@SuppressWarnings("unchecked")
+	public static Object[] findMethodNode(String internalClassName, String name, String desc) {
+		try {
+			// iterate through hierarchy and search for method node /
+			// class that really implements the method
+			while (internalClassName != null) {
+				ClassReader cr = new ClassReader(Thread.currentThread().getContextClassLoader()
+						.getResourceAsStream(internalClassName.replace('.', '/') + ".class"));
+				final ClassNode cn = new ClassNode();
+				cr.accept(cn, 0);
+				for (MethodNode mn : (List<MethodNode>) cn.methods) {
+					if (mn.name.equals(name) && mn.desc.equals(desc)) {
+						return new Object[]{ mn , cr.getClassName()};
+					}
+				}
+				internalClassName = cr.getSuperName();
+			}
+		} catch (IOException e) {
+			throw new IllegalStateException("Method '" + name + "' could not be found", e);
+		}
+		throw new IllegalStateException("Method '" + name + "' could not be found");
+	}
+
+	public static boolean isTagged(Value value) {
+		return value != null && value instanceof TaggedValue;
+	}
+
+	public static TaggedValue tagged(Value value) {
+		return (TaggedValue) value;
+	}
+
+	/**
+	 *
+	 * @return returns whether a value of the list of values is or contains
+	 * important dependencies (inputs or collectors) that require special analysis
+	 * (e.g. to dig into a nested method). The first argument can be skipped e.g.
+	 * in order to skip the "this" of non-static method arguments.
+	 */
+	public static boolean hasImportantDependencies(List<? extends BasicValue> values, boolean skipFirst) {
+		for (BasicValue value : values) {
+			if (skipFirst) {
+				skipFirst = false;
+				continue;
+			}
+			if (hasImportantDependencies(value)) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	/**
+	 *
+	 * @return returns whether a value is or contains important dependencies (inputs or collectors)
+	 * that require special analysis (e.g. to dig into a nested method)
+	 */
+	public static boolean hasImportantDependencies(BasicValue bv) {
+		if (!isTagged(bv)) {
+			return false;
+		}
+		final TaggedValue value = tagged(bv);
+
+		if (value.isInput() || value.isCollector()) {
+			return true;
+		}
+		else if (value.canContainFields() && value.getContainerMapping() != null) {
+			for (TaggedValue tv : value.getContainerMapping().values()) {
+				if (hasImportantDependencies(tv)) {
+					return true;
+				}
+			}
+		}
+		return false;
+	}
+
+	public static TaggedValue mergeInputs(List<TaggedValue> returnValues) {
+		TaggedValue first = null;
+		for (TaggedValue tv : returnValues) {
+			if (first == null) {
+				first = tv;
+			}
+			else if (!first.equals(tv)) {
+				return null;
+			}
+		}
+		return first;
+	}
+
+	public static TaggedValue mergeContainers(List<TaggedValue> returnValues) {
+		if (returnValues.size() == 0) {
+			return null;
+		}
+
+		Type returnType = null;
+
+		// do intersections of field names
+		Set<String> keys = null;
+		for (TaggedValue tv : returnValues) {
+			if (keys == null) {
+				keys = new HashSet<String>(tv.getContainerMapping().keySet());
+				returnType = tv.getType();
+			}
+			else {
+				keys.retainAll(tv.getContainerMapping().keySet());
+			}
+		}
+
+		// filter mappings with undefined state
+		final HashMap<String, TaggedValue> resultMapping = new HashMap<String, TaggedValue>(keys.size());
+		final List<String> filteredMappings = new ArrayList<String>(keys.size());
+		for (TaggedValue tv : returnValues) {
+			final Map<String, TaggedValue> cm = tv.getContainerMapping();
+			for (String key : keys) {
+				if (cm.containsKey(key)) {
+					// add mapping with undefined state to filter
+					if (!filteredMappings.contains(key) && cm.get(key) == null) {
+						filteredMappings.add(key);
+					}
+					// add mapping to result mapping
+					else if (!resultMapping.containsKey(key) && !filteredMappings.contains(key)) {
+						resultMapping.put(key, cm.get(key));
+					}
+					// if mapping is present in result and filter,
+					// remove it from result
+					else if (resultMapping.containsKey(key)
+							&& filteredMappings.contains(key)) {
+						resultMapping.remove(key);
+					}
+					// if mapping is already present in result,
+					// remove it and mark it as mapping with undefined state in filter
+					else if (resultMapping.containsKey(key)
+							&& !filteredMappings.contains(key)
+							&& !cm.get(key).equals(resultMapping.get(key))) {
+						filteredMappings.add(key);
+						resultMapping.remove(key);
+					}
+				}
+			}
+		}
+
+		// recursively merge contained mappings
+		Iterator<String> it = resultMapping.keySet().iterator();
+		while (it.hasNext()) {
+			String key = it.next();
+			TaggedValue value = mergeReturnValues(Collections.singletonList(resultMapping.get(key)));
+			if (value == null) {
+				it.remove();
+			}
+			else {
+				resultMapping.put(key, value);
+			}
+		}
+
+		if (resultMapping.size() > 0) {
+			return new TaggedValue(returnType, resultMapping);
+		}
+		return null;
+	}
+
+	public static TaggedValue mergeReturnValues(List<TaggedValue> returnValues) {
+		if (returnValues.size() == 0 || returnValues.get(0) == null) {
+			return null;
+		}
+
+		// check if either all inputs or all containers
+		boolean allInputs = returnValues.get(0).isInput();
+		for (TaggedValue tv : returnValues) {
+			if (tv == null || tv.isInput() != allInputs) {
+				return null;
+			}
+			// check if there are uninteresting values
+			if (tv.canNotContainInput()) {
+				return null;
+			}
+		}
+
+		if (allInputs) {
+			return mergeInputs(returnValues);
+		}
+		return mergeContainers(returnValues);
+	}
+
+	public static void removeUngroupedInputsFromContainer(TaggedValue value) {
+		if (value.getContainerMapping() != null) {
+			Iterator<Map.Entry<String, TaggedValue>> it = value.getContainerMapping().entrySet().iterator();
+			while (it.hasNext()) {
+				Map.Entry<String, TaggedValue> entry = it.next();
+				if (entry.getValue() == null) {
+					continue;
+				}
+				else if (entry.getValue().isInput() && !entry.getValue().isGrouped()) {
+					it.remove();
+				}
+				else if (entry.getValue().canContainFields()) {
+					removeUngroupedInputsFromContainer(entry.getValue());
+				}
+			}
+		}
+	}
+
+	public static TaggedValue removeUngroupedInputs(TaggedValue value) {
+		if (value.isInput()) {
+			if (value.isGrouped()) {
+				return value;
+			}
+		}
+		else if (value.canContainFields()) {
+			removeUngroupedInputsFromContainer(value);
+			if (value.getContainerMapping() != null && value.getContainerMapping().size() > 0) {
+				return value;
+			}
+		}
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesPrecedenceTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesPrecedenceTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesPrecedenceTest.java
new file mode 100644
index 0000000..014a3ca
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesPrecedenceTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.CodeAnalysisMode;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the precedence of semantic properties: annotation > API > static code analyzer
+ */
+public class SemanticPropertiesPrecedenceTest {
+
+	@Test
+	public void testFunctionForwardedAnnotationPrecedence() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().setCodeAnalysisMode(CodeAnalysisMode.OPTIMIZE);
+
+		@SuppressWarnings("unchecked")
+		DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42));
+		input
+				.map(new WildcardForwardedMapperWithForwardAnnotation<Tuple3<Long, String, Integer>>())
+				.output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
+		Plan plan = env.createProgramPlan();
+
+		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
+		MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
+
+		SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
+
+		FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
+		FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
+		FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
+		assertNotNull(fw1);
+		assertNotNull(fw2);
+		assertNotNull(fw3);
+		assertTrue(fw1.contains(0));
+		assertFalse(fw2.contains(1));
+		assertFalse(fw3.contains(2));
+	}
+
+	@Test
+	public void testFunctionSkipCodeAnalysisAnnotationPrecedence() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().setCodeAnalysisMode(CodeAnalysisMode.OPTIMIZE);
+
+		@SuppressWarnings("unchecked")
+		DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42));
+		input
+				.map(new WildcardForwardedMapperWithSkipAnnotation<Tuple3<Long, String, Integer>>())
+				.output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
+		Plan plan = env.createProgramPlan();
+
+		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
+		MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
+
+		SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
+
+		FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
+		FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
+		FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
+		assertNotNull(fw1);
+		assertNotNull(fw2);
+		assertNotNull(fw3);
+		assertFalse(fw1.contains(0));
+		assertFalse(fw2.contains(1));
+		assertFalse(fw3.contains(2));
+	}
+
+	@Test
+	public void testFunctionApiPrecedence() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().setCodeAnalysisMode(CodeAnalysisMode.OPTIMIZE);
+
+		@SuppressWarnings("unchecked")
+		DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42));
+		input
+				.map(new WildcardForwardedMapper<Tuple3<Long, String, Integer>>())
+				.withForwardedFields("f0")
+				.output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
+		Plan plan = env.createProgramPlan();
+
+		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
+		MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
+
+		SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
+
+		FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
+		FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
+		FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
+		assertNotNull(fw1);
+		assertNotNull(fw2);
+		assertNotNull(fw3);
+		assertTrue(fw1.contains(0));
+		assertFalse(fw2.contains(1));
+		assertFalse(fw3.contains(2));
+	}
+
+	@Test
+	public void testFunctionAnalyzerPrecedence() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().setCodeAnalysisMode(CodeAnalysisMode.OPTIMIZE);
+
+		@SuppressWarnings("unchecked")
+		DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42));
+		input
+				.map(new WildcardForwardedMapper<Tuple3<Long, String, Integer>>())
+				.output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
+		Plan plan = env.createProgramPlan();
+
+		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
+		MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
+
+		SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
+
+		FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
+		FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
+		FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
+		assertNotNull(fw1);
+		assertNotNull(fw2);
+		assertNotNull(fw3);
+		assertTrue(fw1.contains(0));
+		assertTrue(fw2.contains(1));
+		assertTrue(fw3.contains(2));
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@FunctionAnnotation.ForwardedFields("f0")
+	public static class WildcardForwardedMapperWithForwardAnnotation<T> implements MapFunction<T, T> {
+
+		@Override
+		public T map(T value)  {
+			return value;
+		}
+	}
+
+	@FunctionAnnotation.SkipCodeAnalysis
+	public static class WildcardForwardedMapperWithSkipAnnotation<T> implements MapFunction<T, T> {
+
+		@Override
+		public T map(T value)  {
+			return value;
+		}
+	}
+
+	public static class WildcardForwardedMapper<T> implements MapFunction<T, T> {
+
+		@Override
+		public T map(T value)  {
+			return value;
+		}
+	}
+}


[3/3] flink git commit: [FLINK-1319] [core] Add static code analysis for user code

Posted by uc...@apache.org.
[FLINK-1319] [core] Add static code analysis for user code

This closes #729.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c854d526
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c854d526
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c854d526

Branch: refs/heads/master
Commit: c854d5260c20b0926c4347c7c9dd7d0f4f11d620
Parents: d433ba9
Author: twalthr <tw...@apache.org>
Authored: Tue May 26 20:22:03 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jun 8 07:29:49 2015 +0200

----------------------------------------------------------------------
 .../flink/api/common/CodeAnalysisMode.java      |   52 +
 .../flink/api/common/ExecutionConfig.java       |   26 +
 flink-java/pom.xml                              |    4 +-
 .../flink/api/java/ExecutionEnvironment.java    |    3 +
 .../java/org/apache/flink/api/java/Utils.java   |    4 +
 .../api/java/functions/FunctionAnnotation.java  |   17 +-
 .../api/java/functions/SemanticPropUtil.java    |  133 +-
 .../api/java/operators/CoGroupOperator.java     |    6 +-
 .../flink/api/java/operators/CrossOperator.java |    2 +
 .../api/java/operators/FilterOperator.java      |    2 +
 .../api/java/operators/FlatMapOperator.java     |    4 +-
 .../java/operators/GroupCombineOperator.java    |    4 +-
 .../api/java/operators/GroupReduceOperator.java |    6 +-
 .../flink/api/java/operators/JoinOperator.java  |    8 +-
 .../flink/api/java/operators/MapOperator.java   |    2 +
 .../api/java/operators/ReduceOperator.java      |    6 +-
 .../java/operators/SingleInputUdfOperator.java  |   31 +-
 .../api/java/operators/TwoInputUdfOperator.java |   43 +-
 .../flink/api/java/operators/UdfOperator.java   |    1 -
 .../api/java/operators/UdfOperatorUtils.java    |  103 ++
 .../api/java/sca/CodeAnalyzerException.java     |   42 +
 .../flink/api/java/sca/CodeErrorException.java  |   42 +
 .../flink/api/java/sca/ModifiedASMAnalyzer.java |  169 +++
 .../flink/api/java/sca/ModifiedASMFrame.java    |   84 ++
 .../api/java/sca/NestedMethodAnalyzer.java      |  730 ++++++++++
 .../apache/flink/api/java/sca/TaggedValue.java  |  421 ++++++
 .../apache/flink/api/java/sca/UdfAnalyzer.java  |  474 ++++++
 .../flink/api/java/sca/UdfAnalyzerUtils.java    |  329 +++++
 .../SemanticPropertiesPrecedenceTest.java       |  183 +++
 .../api/java/sca/UdfAnalyzerExamplesTest.java   |  707 +++++++++
 .../flink/api/java/sca/UdfAnalyzerTest.java     | 1353 ++++++++++++++++++
 .../apache/flink/test/util/TestEnvironment.java |    4 +-
 pom.xml                                         |    4 +-
 33 files changed, 4916 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-core/src/main/java/org/apache/flink/api/common/CodeAnalysisMode.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/CodeAnalysisMode.java b/flink-core/src/main/java/org/apache/flink/api/common/CodeAnalysisMode.java
new file mode 100644
index 0000000..e9d8541
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/CodeAnalysisMode.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+/**
+ * Specifies to which extent user-defined functions are analyzed in order
+ * to give the Flink optimizer an insight of UDF internals and inform
+ * the user about common implementation mistakes.
+ *
+ * The analyzer gives hints about:
+ *  - ForwardedFields semantic properties
+ *  - Warnings if static fields are modified by a Function
+ *  - Warnings if a FilterFunction modifies its input objects
+ *  - Warnings if a Function returns null
+ *  - Warnings if a tuple access uses a wrong index
+ *  - Information about the number of object creations (for manual optimization)
+ */
+public enum CodeAnalysisMode {
+
+	/**
+	 * Code analysis does not take place.
+	 */
+	DISABLE,
+
+	/**
+	 * Hints for improvement of the program are printed to the log.
+	 */
+	HINT,
+
+	/**
+	 * The program will be automatically optimized with knowledge from code
+	 * analysis.
+	 */
+	OPTIMIZE;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 04a518e..4974295 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -44,6 +44,10 @@ import java.util.Map;
  *         handling <i>generic types</i> and <i>POJOs</i>. This is usually only needed
  *         when the functions return not only the types declared in their signature, but
  *         also subclasses of those types.</li>
+ *     <li>The {@link CodeAnalysisMode} of the program: Enable hinting/optimizing or disable
+ *         the "static code analyzer". The static code analyzer pre-interprets user-defined functions in order to
+ *         get implementation insights for program improvements that can be printed to the log or
+ *         automatically applied.</li>
  * </ul>
  */
 public class ExecutionConfig implements Serializable {
@@ -78,6 +82,8 @@ public class ExecutionConfig implements Serializable {
 
 	private boolean forceAvro = false;
 
+	private CodeAnalysisMode codeAnalysisMode = CodeAnalysisMode.DISABLE;
+
 	/** If set to true, progress updates are printed to System.out during execution */
 	private boolean printProgressDuringExecution = true;
 
@@ -316,6 +322,26 @@ public class ExecutionConfig implements Serializable {
 	public boolean isObjectReuseEnabled() {
 		return objectReuse;
 	}
+	
+	/**
+	 * Sets the {@link CodeAnalysisMode} of the program. Specifies to which extent user-defined
+	 * functions are analyzed in order to give the Flink optimizer an insight of UDF internals
+	 * and inform the user about common implementation mistakes. The static code analyzer pre-interprets
+	 * user-defined functions in order to get implementation insights for program improvements
+	 * that can be printed to the log, automatically applied, or disabled.
+	 * 
+	 * @param codeAnalysisMode see {@link CodeAnalysisMode}
+	 */
+	public void setCodeAnalysisMode(CodeAnalysisMode codeAnalysisMode) {
+		this.codeAnalysisMode = codeAnalysisMode;
+	}
+	
+	/**
+	 * Returns the {@link CodeAnalysisMode} of the program.
+	 */
+	public CodeAnalysisMode getCodeAnalysisMode() {
+		return codeAnalysisMode;
+	}
 
 	/**
 	 * Enables the printing of progress update messages to {@code System.out}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 6196e82..8879803 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -60,10 +60,10 @@ under the License.
 
 		<dependency>
 			<groupId>org.ow2.asm</groupId>
-			<artifactId>asm</artifactId>
+			<artifactId>asm-all</artifactId>
 			<version>${asm.version}</version>
 		</dependency>
-		
+
 		<dependency>
 			<groupId>com.twitter</groupId>
 			<artifactId>chill_${scala.binary.version}</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 3a5b04f..d50ddb4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -991,6 +991,9 @@ public abstract class ExecutionEnvironment {
 			LOG.debug("Registered Kryo default Serializers: {}", Joiner.on(',').join(config.getDefaultKryoSerializers()));
 			LOG.debug("Registered Kryo default Serializers Classes {}", Joiner.on(',').join(config.getDefaultKryoSerializerClasses()));
 			LOG.debug("Registered POJO types: {}", Joiner.on(',').join(config.getRegisteredPojoTypes()));
+
+			// print information about static code analysis
+			LOG.debug("Static code analysis mode: {}", config.getCodeAnalysisMode());
 		}
 
 		return plan;

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index 38b24a2..dd1d6d2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -29,8 +29,10 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.List;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
+import static org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAnalysis;
 
 
 public class Utils {
@@ -70,6 +72,7 @@ public class Utils {
 		}
 	}
 
+	@SkipCodeAnalysis
 	public static class CountHelper<T> extends RichFlatMapFunction<T, Long> {
 
 		private static final long serialVersionUID = 1L;
@@ -93,6 +96,7 @@ public class Utils {
 		}
 	}
 
+	@SkipCodeAnalysis
 	public static class CollectHelper<T> extends RichFlatMapFunction<T, T> {
 
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
index 09678fd..bfc1bf0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
@@ -369,7 +369,22 @@ public class FunctionAnnotation {
 	public @interface ReadFieldsSecond {
 		String[] value();
 	}
-	
+
+	/**
+	 * The SkipCodeAnalysis annotation declares that a function will not be analyzed by Flink's
+	 * code analysis capabilities independent of the configured {@link org.apache.flink.api.common.CodeAnalysisMode}.
+	 *
+	 * If this annotation is not present the static code analyzer pre-interprets user-defined
+	 * functions in order to get implementation insights for program improvements that can be
+	 * printed to the log as hints, automatically applied, or disabled (see
+	 * {@link org.apache.flink.api.common.ExecutionConfig}).
+	 *
+	 */
+	@Target(ElementType.TYPE)
+	@Retention(RetentionPolicy.RUNTIME)
+	public @interface SkipCodeAnalysis {
+	}
+
 	/**
 	 * Private constructor to prevent instantiation. This class is intended only as a container.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
index 4569be3..7640e2c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
@@ -19,14 +19,6 @@
 
 package org.apache.flink.api.java.functions;
 
-import java.lang.annotation.Annotation;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties.InvalidSemanticAnnotationException;
@@ -34,13 +26,13 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
-import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFieldsSecond;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsFirst;
@@ -48,6 +40,14 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsSecond;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 
+import java.lang.annotation.Annotation;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 public class SemanticPropUtil {
 
 	private final static String REGEX_WILDCARD = "[\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR+"\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA+"]";
@@ -235,7 +235,7 @@ public class SemanticPropUtil {
 	public static SingleInputSemanticProperties getSemanticPropsSingle(
 			Set<Annotation> set, TypeInformation<?> inType, TypeInformation<?> outType) {
 		if (set == null) {
-			return new SingleInputSemanticProperties();
+			return null;
 		}
 		Iterator<Annotation> it = set.iterator();
 
@@ -264,15 +264,14 @@ public class SemanticPropUtil {
 			SingleInputSemanticProperties result = new SingleInputSemanticProperties();
 			getSemanticPropsSingleFromString(result, forwarded, nonForwarded, read, inType, outType);
 			return result;
-		} else {
-			return new SingleInputSemanticProperties();
 		}
+		return null;
 	}
 
 	public static DualInputSemanticProperties getSemanticPropsDual(
 			Set<Annotation> set, TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType) {
 		if (set == null) {
-			return new DualInputSemanticProperties();
+			return null;
 		}
 		Iterator<Annotation> it = set.iterator();
 
@@ -309,15 +308,20 @@ public class SemanticPropUtil {
 			getSemanticPropsDualFromString(result, forwardedFirst, forwardedSecond,
 					nonForwardedFirst, nonForwardedSecond, readFirst, readSecond, inType1, inType2, outType);
 			return result;
-		} else {
-			return new DualInputSemanticProperties();
 		}
+		return null;
+	}
+
+	public static void getSemanticPropsSingleFromString(SingleInputSemanticProperties result,
+														String[] forwarded, String[] nonForwarded, String[] readSet,
+														TypeInformation<?> inType, TypeInformation<?> outType) {
+		getSemanticPropsSingleFromString(result, forwarded, nonForwarded, readSet, inType, outType, false);
 	}
 
 	public static void getSemanticPropsSingleFromString(SingleInputSemanticProperties result,
 			String[] forwarded, String[] nonForwarded, String[] readSet,
-			TypeInformation<?> inType, TypeInformation<?> outType)
-	{
+			TypeInformation<?> inType, TypeInformation<?> outType,
+			boolean skipIncompatibleTypes) {
 
 		boolean hasForwardedAnnotation = false;
 		boolean hasNonForwardedAnnotation = false;
@@ -334,9 +338,9 @@ public class SemanticPropUtil {
 			throw new InvalidSemanticAnnotationException("Either ForwardedFields OR " +
 					"NonForwardedFields annotation permitted, NOT both.");
 		} else if(hasForwardedAnnotation) {
-			parseForwardedFields(result, forwarded, inType, outType, 0);
+			parseForwardedFields(result, forwarded, inType, outType, 0, skipIncompatibleTypes);
 		} else if(hasNonForwardedAnnotation) {
-			parseNonForwardedFields(result, nonForwarded, inType, outType, 0);
+			parseNonForwardedFields(result, nonForwarded, inType, outType, 0, skipIncompatibleTypes);
 		}
 		parseReadFields(result, readSet, inType, 0);
 	}
@@ -345,8 +349,18 @@ public class SemanticPropUtil {
 			String[] forwardedFirst, String[] forwardedSecond,
 			String[] nonForwardedFirst, String[] nonForwardedSecond, String[]
 			readFieldsFirst, String[] readFieldsSecond,
-			TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType)
-	{
+			TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType) {
+		getSemanticPropsDualFromString(result, forwardedFirst, forwardedSecond, nonForwardedFirst,
+				nonForwardedSecond, readFieldsFirst, readFieldsSecond, inType1, inType2,  outType,
+				false);
+	}
+
+	public static void getSemanticPropsDualFromString(DualInputSemanticProperties result,
+			String[] forwardedFirst, String[] forwardedSecond,
+			String[] nonForwardedFirst, String[] nonForwardedSecond, String[]
+			readFieldsFirst, String[] readFieldsSecond,
+			TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType,
+			boolean skipIncompatibleTypes) {
 
 		boolean hasForwardedFirstAnnotation = false;
 		boolean hasForwardedSecondAnnotation = false;
@@ -377,15 +391,15 @@ public class SemanticPropUtil {
 		}
 
 		if(hasForwardedFirstAnnotation) {
-			parseForwardedFields(result, forwardedFirst, inType1, outType, 0);
+			parseForwardedFields(result, forwardedFirst, inType1, outType, 0, skipIncompatibleTypes);
 		} else if(hasNonForwardedFirstAnnotation) {
-			parseNonForwardedFields(result, nonForwardedFirst, inType1, outType, 0);
+			parseNonForwardedFields(result, nonForwardedFirst, inType1, outType, 0, skipIncompatibleTypes);
 		}
 
 		if(hasForwardedSecondAnnotation) {
-			parseForwardedFields(result, forwardedSecond, inType2, outType, 1);
+			parseForwardedFields(result, forwardedSecond, inType2, outType, 1, skipIncompatibleTypes);
 		} else if(hasNonForwardedSecondAnnotation) {
-			parseNonForwardedFields(result, nonForwardedSecond, inType2, outType, 1);
+			parseNonForwardedFields(result, nonForwardedSecond, inType2, outType, 1, skipIncompatibleTypes);
 		}
 
 		parseReadFields(result, readFieldsFirst, inType1, 0);
@@ -393,7 +407,8 @@ public class SemanticPropUtil {
 	}
 
 
-	private static void parseForwardedFields(SemanticProperties sp, String[] forwardedStr, TypeInformation<?> inType, TypeInformation<?> outType, int input) {
+	private static void parseForwardedFields(SemanticProperties sp, String[] forwardedStr,
+			TypeInformation<?> inType, TypeInformation<?> outType, int input, boolean skipIncompatibleTypes) {
 
 		if (forwardedStr == null) {
 			return;
@@ -412,8 +427,13 @@ public class SemanticPropUtil {
 			if (wcMatcher.matches()) {
 
 				if (!inType.equals(outType)) {
-					throw new InvalidSemanticAnnotationException("Forwarded field annotation \"" + s +
-							"\" with wildcard only allowed for identical input and output types.");
+					if (skipIncompatibleTypes) {
+						continue;
+					}
+					else {
+						throw new InvalidSemanticAnnotationException("Forwarded field annotation \"" + s +
+								"\" with wildcard only allowed for identical input and output types.");
+					}
 				}
 
 				for (int i = 0; i < inType.getTotalFields(); i++) {
@@ -440,8 +460,13 @@ public class SemanticPropUtil {
 
 				try {
 					// check type compatibility
-					if (!areFieldsCompatible(sourceStr, inType, targetStr, outType)) {
-						throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match.");
+					if (!areFieldsCompatible(sourceStr, inType, targetStr, outType, !skipIncompatibleTypes)) {
+						if (skipIncompatibleTypes) {
+							continue;
+						}
+						else {
+							throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match.");
+						}
 					}
 					List<FlatFieldDescriptor> inFFDs = getFlatFields(sourceStr, inType);
 					List<FlatFieldDescriptor> outFFDs = getFlatFields(targetStr, outType);
@@ -478,8 +503,13 @@ public class SemanticPropUtil {
 					String fieldStr = fieldMatcher.group();
 					try {
 						// check if field is compatible in input and output type
-						if (!areFieldsCompatible(fieldStr, inType, fieldStr, outType)) {
-							throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match.");
+						if (!areFieldsCompatible(fieldStr, inType, fieldStr, outType, !skipIncompatibleTypes)) {
+							if (skipIncompatibleTypes) {
+								continue;
+							}
+							else {
+								throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match.");
+							}
 						}
 						// add flat field positions
 						List<FlatFieldDescriptor> inFFDs = getFlatFields(fieldStr, inType);
@@ -503,8 +533,8 @@ public class SemanticPropUtil {
 		}
 	}
 
-	private static void parseNonForwardedFields(
-			SemanticProperties sp, String[] nonForwardedStr, TypeInformation<?> inType, TypeInformation<?> outType, int input) {
+	private static void parseNonForwardedFields(SemanticProperties sp, String[] nonForwardedStr,
+			TypeInformation<?> inType, TypeInformation<?> outType, int input, boolean skipIncompatibleTypes) {
 
 		if(nonForwardedStr == null) {
 			return;
@@ -521,7 +551,12 @@ public class SemanticPropUtil {
 			}
 
 			if(!inType.equals(outType)) {
-				throw new InvalidSemanticAnnotationException("Non-forwarded fields annotation only allowed for identical input and output types.");
+				if (skipIncompatibleTypes) {
+					continue;
+				}
+				else {
+					throw new InvalidSemanticAnnotationException("Non-forwarded fields annotation only allowed for identical input and output types.");
+				}
 			}
 
 			Matcher matcher = PATTERN_LIST.matcher(s);
@@ -613,14 +648,24 @@ public class SemanticPropUtil {
 
 	////////////////////// UTIL METHODS ///////////////////////////////
 
-	private static boolean areFieldsCompatible(String sourceField, TypeInformation<?> inType, String targetField, TypeInformation<?> outType) {
-
-		// get source type information
-		TypeInformation<?> sourceType = getExpressionTypeInformation(sourceField, inType);
-		// get target type information
-		TypeInformation<?> targetType = getExpressionTypeInformation(targetField, outType);
+	private static boolean areFieldsCompatible(String sourceField, TypeInformation<?> inType, String targetField,
+			TypeInformation<?> outType, boolean throwException) {
 
-		return (sourceType.equals(targetType));
+		try {
+			// get source type information
+			TypeInformation<?> sourceType = getExpressionTypeInformation(sourceField, inType);
+			// get target type information
+			TypeInformation<?> targetType = getExpressionTypeInformation(targetField, outType);
+			return sourceType.equals(targetType);
+		}
+		catch (InvalidFieldReferenceException e) {
+			if (throwException) {
+				throw e;
+			}
+			else {
+				return false;
+			}
+		}
 	}
 
 	private static TypeInformation<?> getExpressionTypeInformation(String fieldStr, TypeInformation<?> typeInfo) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 115a238..36378b9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -124,6 +124,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 
 		this.keys1 = keys1;
 		this.keys2 = keys2;
+
+		UdfOperatorUtils.analyzeDualInputUdf(this, CoGroupFunction.class, defaultName, function, keys1, keys2);
 	}
 	
 	@Override
@@ -144,9 +146,9 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 			int numFields1 = this.getInput1Type().getTotalFields();
 			int numFields2 = this.getInput2Type().getTotalFields();
 			int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ?
-					((Keys.SelectorFunctionKeys) this.keys1).getKeyType().getTotalFields() : 0;
+					((Keys.SelectorFunctionKeys<?,?>) this.keys1).getKeyType().getTotalFields() : 0;
 			int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ?
-					((Keys.SelectorFunctionKeys) this.keys2).getKeyType().getTotalFields() : 0;
+					((Keys.SelectorFunctionKeys<?,?>) this.keys2).getKeyType().getTotalFields() : 0;
 
 			props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index 5ed3e40..ae990ce 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -67,6 +67,8 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 		this.function = function;
 		this.defaultName = defaultName;
 		this.hint = hint;
+
+		UdfOperatorUtils.analyzeDualInputUdf(this, CrossFunction.class, defaultName, function, null, null);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
index f55de1c..70bfa93 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
@@ -41,6 +41,8 @@ public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperat
 		
 		this.function = function;
 		this.defaultName = defaultName;
+
+		UdfOperatorUtils.analyzeSingleInputUdf(this, FilterFunction.class, defaultName, function, null);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
index 8caacae..10bb286 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
@@ -43,13 +43,15 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl
 		
 		this.function = function;
 		this.defaultName = defaultName;
+
+		UdfOperatorUtils.analyzeSingleInputUdf(this, FlatMapFunction.class, defaultName, function, null);
 	}
 	
 	@Override
 	protected FlatMapFunction<IN, OUT> getFunction() {
 		return function;
 	}
-	
+
 	@Override
 	protected FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
 		String name = getName() != null ? getName() : "FlatMap at "+defaultName;

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index dc26fec..30cb0be 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -98,9 +98,9 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 				this.grouper != null &&
 				this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
 
-			int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+			int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
 			if(this.grouper instanceof SortedGrouping) {
-				offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+				offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
 			}
 
 			props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index bc4413f..fcbb888 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -87,6 +87,8 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 		this.defaultName = defaultName;
 
 		checkCombinability();
+
+		UdfOperatorUtils.analyzeSingleInputUdf(this, GroupReduceFunction.class, defaultName, function, grouper.keys);
 	}
 
 	private void checkCombinability() {
@@ -132,9 +134,9 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 				this.grouper != null &&
 				this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
 
-			int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+			int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
 			if(this.grouper instanceof SortedGrouping) {
-				offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+				offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
 			}
 			props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 4adf6b3..1e5baab 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -202,6 +202,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			
 			this.function = function;
 			this.joinLocationName = joinLocationName;
+
+			UdfOperatorUtils.analyzeDualInputUdf(this, FlatJoinFunction.class, joinLocationName, function, keys1, keys2);
 		}
 
 		public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
@@ -217,6 +219,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			}
 
 			this.function = generatedFunction;
+
+			UdfOperatorUtils.analyzeDualInputUdf(this, JoinFunction.class, joinLocationName, function, keys1, keys2);
 		}
 		
 		@Override
@@ -237,9 +241,9 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				int numFields1 = this.getInput1Type().getTotalFields();
 				int numFields2 = this.getInput2Type().getTotalFields();
 				int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ?
-						((Keys.SelectorFunctionKeys) this.keys1).getKeyType().getTotalFields() : 0;
+						((Keys.SelectorFunctionKeys<?,?>) this.keys1).getKeyType().getTotalFields() : 0;
 				int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ?
-						((Keys.SelectorFunctionKeys) this.keys2).getKeyType().getTotalFields() : 0;
+						((Keys.SelectorFunctionKeys<?,?>) this.keys2).getKeyType().getTotalFields() : 0;
 
 				props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index 2663a2a..eaaeb38 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -45,6 +45,8 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
 		
 		this.defaultName = defaultName;
 		this.function = function;
+
+		UdfOperatorUtils.analyzeSingleInputUdf(this, MapFunction.class, defaultName, function, null);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index e770278..1193da5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -72,6 +72,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 		this.function = function;
 		this.grouper = input;
 		this.defaultName = defaultName;
+
+		UdfOperatorUtils.analyzeSingleInputUdf(this, ReduceFunction.class, defaultName, function, grouper.keys);
 	}
 	
 	@Override
@@ -89,9 +91,9 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 				this.grouper != null &&
 				this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
 
-			int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+			int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
 			if(this.grouper instanceof SortedGrouping) {
-				offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+				offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
 			}
 			props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
index f55489f..9301e1a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
@@ -54,8 +54,11 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 
 	private Map<String, DataSet<?>> broadcastVariables;
 
+	// NOTE: only set this variable via setSemanticProperties()
 	private SingleInputSemanticProperties udfSemantics;
 
+	private boolean analyzedUdfSemantics;
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -157,11 +160,12 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 
 		if(this.udfSemantics == null) {
 			// extract semantic properties from function annotations
-			this.udfSemantics = extractSemanticAnnotations(getFunction().getClass());
+			setSemanticProperties(extractSemanticAnnotations(getFunction().getClass()));
 		}
 
-		if(this.udfSemantics == null) {
-			this.udfSemantics = new SingleInputSemanticProperties();
+		if(this.udfSemantics == null
+				|| this.analyzedUdfSemantics) { // discard analyzed semantic properties
+			setSemanticProperties(new SingleInputSemanticProperties());
 			SemanticPropUtil.getSemanticPropsSingleFromString(this.udfSemantics, forwardedFields, null, null, this.getInputType(), this.getResultType());
 		} else {
 			if(udfWithForwardedFieldsAnnotation(getFunction().getClass())) {
@@ -311,11 +315,15 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 
 	@Override
 	public SingleInputSemanticProperties getSemanticProperties() {
-		if (this.udfSemantics == null) {
+		if (this.udfSemantics == null || analyzedUdfSemantics) {
 			SingleInputSemanticProperties props = extractSemanticAnnotations(getFunction().getClass());
-			this.udfSemantics = props != null ? props : new SingleInputSemanticProperties();
+			if (props != null) {
+				setSemanticProperties(props);
+			}
+		}
+		if (this.udfSemantics == null) {
+			setSemanticProperties(new SingleInputSemanticProperties());
 		}
-		
 		return this.udfSemantics;
 	}
 
@@ -329,8 +337,17 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 	 */
 	public void setSemanticProperties(SingleInputSemanticProperties properties) {
 		this.udfSemantics = properties;
+		this.analyzedUdfSemantics = false;
 	}
-	
+
+	protected boolean getAnalyzedUdfSemanticsFlag() {
+		return this.analyzedUdfSemantics;
+	}
+
+	protected void setAnalyzedUdfSemanticsFlag() {
+		this.analyzedUdfSemantics = true;
+	}
+
 	protected SingleInputSemanticProperties extractSemanticAnnotations(Class<?> udfClass) {
 		Set<Annotation> annotations = FunctionAnnotation.readSingleForwardAnnotations(udfClass);
 		return SemanticPropUtil.getSemanticPropsSingle(annotations, getInputType(), getResultType());

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
index 91f9f7e..d23dd56 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
@@ -29,12 +29,12 @@ import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.DataSet;
 
 /**
  * The <tt>TwoInputUdfOperator</tt> is the base class of all binary operators that execute
@@ -56,8 +56,11 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 
 	private Map<String, DataSet<?>> broadcastVariables;
 
+	// NOTE: only set this variable via setSemanticProperties()
 	private DualInputSemanticProperties udfSemantics;
 
+	private boolean analyzedUdfSemantics;
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -157,13 +160,13 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 	 */
 	@SuppressWarnings("unchecked")
 	public O withForwardedFieldsFirst(String... forwardedFieldsFirst) {
-		if (this.udfSemantics == null) {
+		if (this.udfSemantics == null || this.analyzedUdfSemantics) {
 			// extract semantic properties from function annotations
-			this.udfSemantics = extractSemanticAnnotationsFromUdf(getFunction().getClass());
+			setSemanticProperties(extractSemanticAnnotationsFromUdf(getFunction().getClass()));
 		}
 
-		if(this.udfSemantics == null) {
-			this.udfSemantics = new DualInputSemanticProperties();
+		if(this.udfSemantics == null || this.analyzedUdfSemantics) {
+			setSemanticProperties(new DualInputSemanticProperties());
 			SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, forwardedFieldsFirst, null,
 					null, null, null, null, getInput1Type(), getInput2Type(), getResultType());
 		} else {
@@ -232,13 +235,13 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 	 */
 	@SuppressWarnings("unchecked")
 	public O withForwardedFieldsSecond(String... forwardedFieldsSecond) {
-		if (this.udfSemantics == null) {
+		if (this.udfSemantics == null || this.analyzedUdfSemantics) {
 			// extract semantic properties from function annotations
-			this.udfSemantics = extractSemanticAnnotationsFromUdf(getFunction().getClass());
+			setSemanticProperties(extractSemanticAnnotationsFromUdf(getFunction().getClass()));
 		}
 
-		if(this.udfSemantics == null) {
-			this.udfSemantics = new DualInputSemanticProperties();
+		if(this.udfSemantics == null || this.analyzedUdfSemantics) {
+			setSemanticProperties(new DualInputSemanticProperties());
 			SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, null, forwardedFieldsSecond,
 					null, null, null, null, getInput1Type(), getInput2Type(), getResultType());
 		} else {
@@ -390,11 +393,15 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 
 	@Override
 	public DualInputSemanticProperties getSemanticProperties() {
-		if (this.udfSemantics == null) {
+		if (this.udfSemantics == null || analyzedUdfSemantics) {
 			DualInputSemanticProperties props = extractSemanticAnnotationsFromUdf(getFunction().getClass());
-			this.udfSemantics = props != null ? props : new DualInputSemanticProperties();
+			if (props != null) {
+				setSemanticProperties(props);
+			}
+		}
+		if (this.udfSemantics == null) {
+			setSemanticProperties(new DualInputSemanticProperties());
 		}
-		
 		return this.udfSemantics;
 	}
 
@@ -408,9 +415,17 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 	 */
 	public void setSemanticProperties(DualInputSemanticProperties properties) {
 		this.udfSemantics = properties;
+		this.analyzedUdfSemantics = false;
 	}
-	
-	
+
+	protected boolean getAnalyzedUdfSemanticsFlag() {
+		return this.analyzedUdfSemantics;
+	}
+
+	protected void setAnalyzedUdfSemanticsFlag() {
+		this.analyzedUdfSemantics = true;
+	}
+
 	protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
 		Set<Annotation> annotations = FunctionAnnotation.readDualForwardAnnotations(udfClass);
 		return SemanticPropUtil.getSemanticPropsDual(annotations, getInput1Type(), getInput2Type(), getResultType());

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
index 026cc61..924c84f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
@@ -57,7 +57,6 @@ public interface UdfOperator<O extends UdfOperator<O>> {
 	
 	/**
 	 * Gets the semantic properties that have been set for the user-defined functions (UDF).
-	 * This method may return null, if no semantic properties have been set so far.
 	 * 
 	 * @return The semantic properties of the UDF.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
new file mode 100644
index 0000000..52a0d08
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.api.common.CodeAnalysisMode;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.sca.CodeAnalyzerException;
+import org.apache.flink.api.java.sca.UdfAnalyzer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class UdfOperatorUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(UdfOperatorUtils.class);
+
+	public static void analyzeSingleInputUdf(SingleInputUdfOperator<?, ?, ?> operator, Class<?> udfBaseClass,
+			String defaultName, Function udf, Keys<?> key) {
+		final CodeAnalysisMode mode = operator.getExecutionEnvironment().getConfig().getCodeAnalysisMode();
+		if (mode != CodeAnalysisMode.DISABLE
+				&& !udf.getClass().isAnnotationPresent(FunctionAnnotation.SkipCodeAnalysis.class)) {
+			final String operatorName = operator.getName() != null ? operator.getName()
+					: udfBaseClass.getSimpleName() + " at "+defaultName;
+			try {
+				final UdfAnalyzer analyzer = new UdfAnalyzer(udfBaseClass, udf.getClass(), operatorName, operator.getInputType(), null,
+						operator.getResultType(), key, null, mode == CodeAnalysisMode.OPTIMIZE);
+				final boolean success = analyzer.analyze();
+				if (success) {
+					if (mode == CodeAnalysisMode.OPTIMIZE
+							&& !operator.udfWithForwardedFieldsAnnotation(udf.getClass())) {
+						analyzer.addSemanticPropertiesHints();
+						operator.setSemanticProperties((SingleInputSemanticProperties) analyzer.getSemanticProperties());
+						operator.setAnalyzedUdfSemanticsFlag();
+					}
+					else if (mode == CodeAnalysisMode.HINT) {
+						analyzer.addSemanticPropertiesHints();
+					}
+					analyzer.printToLogger(LOG);
+				}
+			}
+			catch (InvalidTypesException e) {
+				LOG.debug("Unable to do code analysis due to missing type information.", e);
+			}
+			catch (CodeAnalyzerException e) {
+				LOG.debug("Code analysis failed.", e);
+			}
+		}
+	}
+
+	public static void analyzeDualInputUdf(TwoInputUdfOperator<?, ?, ?, ?> operator, Class<?> udfBaseClass,
+			String defaultName, Function udf, Keys<?> key1, Keys<?> key2) {
+		final CodeAnalysisMode mode = operator.getExecutionEnvironment().getConfig().getCodeAnalysisMode();
+		if (mode != CodeAnalysisMode.DISABLE
+				&& !udf.getClass().isAnnotationPresent(FunctionAnnotation.SkipCodeAnalysis.class)) {
+			final String operatorName = operator.getName() != null ? operator.getName()
+					: udfBaseClass.getSimpleName() + " at " + defaultName;
+			try {
+				final UdfAnalyzer analyzer = new UdfAnalyzer(udfBaseClass, udf.getClass(), operatorName, operator.getInput1Type(),
+						operator.getInput2Type(), operator.getResultType(), key1, key2,
+						mode == CodeAnalysisMode.OPTIMIZE);
+				final boolean success = analyzer.analyze();
+				if (success) {
+					if (mode == CodeAnalysisMode.OPTIMIZE
+							&& !(operator.udfWithForwardedFieldsFirstAnnotation(udf.getClass())
+							|| operator.udfWithForwardedFieldsSecondAnnotation(udf.getClass()))) {
+						analyzer.addSemanticPropertiesHints();
+						operator.setSemanticProperties((DualInputSemanticProperties) analyzer.getSemanticProperties());
+						operator.setAnalyzedUdfSemanticsFlag();
+					}
+					else if (mode == CodeAnalysisMode.HINT) {
+						analyzer.addSemanticPropertiesHints();
+					}
+					analyzer.printToLogger(LOG);
+				}
+			}
+			catch (InvalidTypesException e) {
+				LOG.debug("Unable to do code analysis due to missing type information.", e);
+			}
+			catch (CodeAnalyzerException e) {
+				LOG.debug("Code analysis failed.", e);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeAnalyzerException.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeAnalyzerException.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeAnalyzerException.java
new file mode 100644
index 0000000..e42ae67
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeAnalyzerException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.sca;
+
+/**
+ * Exception that is thrown if code analysis could not run properly.
+ */
+public class CodeAnalyzerException extends RuntimeException {
+	private static final long serialVersionUID = 1L;
+
+	public CodeAnalyzerException() {
+		super();
+	}
+
+	public CodeAnalyzerException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public CodeAnalyzerException(String message) {
+		super(message);
+	}
+
+	public CodeAnalyzerException(Throwable cause) {
+		super(cause);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeErrorException.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeErrorException.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeErrorException.java
new file mode 100644
index 0000000..9afe5d8
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/CodeErrorException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.sca;
+
+/**
+ * Exception that is thrown if code errors could be found during analysis.
+ */
+public class CodeErrorException extends RuntimeException {
+	private static final long serialVersionUID = 1L;
+
+	public CodeErrorException() {
+		super();
+	}
+
+	public CodeErrorException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public CodeErrorException(String message) {
+		super(message);
+	}
+
+	public CodeErrorException(Throwable cause) {
+		super(cause);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java
new file mode 100644
index 0000000..4c0d020
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.objectweb.asm.tree.AbstractInsnNode;
+import org.objectweb.asm.tree.InsnList;
+import org.objectweb.asm.tree.JumpInsnNode;
+import org.objectweb.asm.tree.analysis.Analyzer;
+import org.objectweb.asm.tree.analysis.Frame;
+import org.objectweb.asm.tree.analysis.Interpreter;
+
+import java.lang.reflect.Field;
+
+/**
+ * Modified version of ASMs Analyzer. It defines a custom ASM Frame
+ * and allows jump modification which is necessary for UDFs with
+ * one iterable input e.g. GroupReduce.
+ * (see also UdfAnalyzer's field "iteratorTrueAssumptionApplied")
+ */
+public class ModifiedASMAnalyzer extends Analyzer {
+
+	private NestedMethodAnalyzer interpreter;
+
+	public ModifiedASMAnalyzer(Interpreter interpreter) {
+		super(interpreter);
+		this.interpreter = (NestedMethodAnalyzer) interpreter;
+	}
+
+	protected Frame newFrame(int nLocals, int nStack) {
+		return new ModifiedASMFrame(nLocals, nStack);
+	}
+
+	protected Frame newFrame(Frame src) {
+		return new ModifiedASMFrame(src);
+	}
+
+	// type of jump modification
+	private int jumpModification = NO_MOD;
+	private static final int NO_MOD = -1;
+	private static final int IFEQ_MOD = 0;
+	private static final int IFNE_MOD = 1;
+	private int eventInsn;
+
+	// current state of modification
+	private int jumpModificationState = DO_NOTHING;
+	private static final int DO_NOTHING = -1;
+	private static final int PRE_STATE = 0;
+	private static final int MOD_STATE = 1;
+	private static final int WAIT_FOR_INSN_STATE = 2;
+
+	public void requestIFEQLoopModification() {
+		if (jumpModificationState != DO_NOTHING) {
+			throw new CodeAnalyzerException("Unable to do jump modifications (unsupported nested jumping).");
+		}
+		jumpModification = IFEQ_MOD;
+		jumpModificationState = PRE_STATE;
+	}
+
+	public void requestIFNELoopModification() {
+		if (jumpModificationState != DO_NOTHING) {
+			throw new CodeAnalyzerException("Unable to do jump modifications (unsupported nested jumping).");
+		}
+		jumpModification = IFNE_MOD;
+		jumpModificationState = PRE_STATE;
+	}
+
+	@Override
+	protected void newControlFlowEdge(int insn, int successor) {
+		try {
+			if (jumpModificationState == PRE_STATE) {
+				jumpModificationState = MOD_STATE;
+			}
+			else if (jumpModificationState == MOD_STATE) {
+				// this modification swaps the top 2 values on the queue stack
+				// it ensures that the "TRUE" path will be executed first, which is important
+				// for a later merge
+				if (jumpModification == IFEQ_MOD) {
+					final int top = accessField(Analyzer.class, "top").getInt(this);
+					final int[] queue = (int[]) accessField(Analyzer.class, "queue").get(this);
+
+					final int tmp = queue[top - 2];
+					queue[top - 2] = queue[top - 1];
+					queue[top - 1] = tmp;
+
+					eventInsn = queue[top - 2] - 1;
+					final InsnList insns = (InsnList) accessField(Analyzer.class, "insns").get(this);
+					// check if instruction is a goto instruction
+					// if yes this is loop structure
+					if (insns.get(eventInsn) instanceof JumpInsnNode) {
+						jumpModificationState = WAIT_FOR_INSN_STATE;
+					}
+					// no loop -> end of modification
+					else {
+						jumpModificationState = DO_NOTHING;
+					}
+				}
+				// this modification changes the merge priority of certain frames (the expression part of the IF)
+				else if (jumpModification == IFNE_MOD) {
+					final Frame[] frames = (Frame[]) accessField(Analyzer.class, "frames").get(this);
+					final Field indexField = accessField(AbstractInsnNode.class, "index");
+
+					final InsnList insns = (InsnList) accessField(Analyzer.class, "insns").get(this);
+					final AbstractInsnNode gotoInsnn = insns.get(successor - 1);
+					// check for a loop
+					if (gotoInsnn instanceof JumpInsnNode) {
+						jumpModificationState = WAIT_FOR_INSN_STATE;
+						// sets a merge priority for all instructions (the expression of the IF)
+						// from the label the goto instruction points to until the evaluation with IFEQ
+						final int idx = indexField.getInt(accessField(JumpInsnNode.class, "label").get(gotoInsnn));
+
+						for (int i=idx; i <= insn; i++) {
+							((ModifiedASMFrame) frames[i]).mergePriority = true;
+						}
+						eventInsn = idx - 2;
+					}
+					else {
+						jumpModificationState = DO_NOTHING;
+					}
+				}
+			}
+			// wait for the goto instruction
+			else if (jumpModificationState == WAIT_FOR_INSN_STATE && insn == eventInsn) {
+				jumpModificationState = DO_NOTHING;
+				final Frame[] frames = (Frame[]) accessField(Analyzer.class, "frames").get(this);
+				// merge the goto instruction frame with the frame that follows
+				// this ensures that local variables are kept
+				if (jumpModification == IFEQ_MOD) {
+					interpreter.rightMergePriority = true;
+					final Field top = accessField(Frame.class, "top");
+					top.setInt(frames[eventInsn], top.getInt(frames[eventInsn + 1]));
+					frames[eventInsn + 1].merge(frames[eventInsn], interpreter);
+				}
+				// finally set a merge priority for the last instruction of the loop (before the IF expression)
+				else if (jumpModification == IFNE_MOD) {
+					((ModifiedASMFrame) frames[eventInsn + 1]).mergePriority = true;
+				}
+			}
+		}
+		catch (Exception e) {
+			throw new CodeAnalyzerException("Unable to do jump modifications.", e);
+		}
+	}
+
+	private Field accessField(Class<?> clazz, String name) {
+		for (Field f : clazz.getDeclaredFields()) {
+			if (f.getName().equals(name)) {
+				f.setAccessible(true);
+				return f;
+			}
+		}
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c854d526/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java
new file mode 100644
index 0000000..497a15c
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sca;
+
+import org.objectweb.asm.tree.AbstractInsnNode;
+import org.objectweb.asm.tree.analysis.AnalyzerException;
+import org.objectweb.asm.tree.analysis.Frame;
+import org.objectweb.asm.tree.analysis.Interpreter;
+import org.objectweb.asm.tree.analysis.Value;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+/**
+ * Modified version of ASMs Frame. It allows to perform different merge
+ * priorities and passes frame information to the Interpreter.
+ */
+public class ModifiedASMFrame extends Frame {
+
+	public boolean mergePriority;
+
+	public ModifiedASMFrame(int nLocals, int nStack) {
+		super(nLocals, nStack);
+	}
+	public ModifiedASMFrame(Frame src) {
+		super(src);
+	}
+
+	@Override
+	public Frame init(Frame src) {
+		mergePriority = ((ModifiedASMFrame)src).mergePriority;
+		return super.init(src);
+	}
+
+	@Override
+	public void execute(AbstractInsnNode insn, Interpreter interpreter)
+			throws AnalyzerException {
+		NestedMethodAnalyzer nma = ((NestedMethodAnalyzer) interpreter);
+		nma.currentFrame = (ModifiedASMFrame) this;
+		super.execute(insn, interpreter);
+	}
+
+	@Override
+	public boolean merge(Frame frame, Interpreter interpreter) throws AnalyzerException {
+		if (((ModifiedASMFrame)frame).mergePriority) {
+			((NestedMethodAnalyzer)interpreter).rightMergePriority = true;
+		}
+		final boolean result = super.merge(frame, interpreter);
+		((NestedMethodAnalyzer)interpreter).rightMergePriority = false;
+		((ModifiedASMFrame)frame).mergePriority = false;
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		// FOR DEBUGGING
+		try {
+			Class<?> frame = Frame.class;
+			Field valuesField = frame.getDeclaredField("values");
+			valuesField.setAccessible(true);
+			Value[] newValues = (Value[]) valuesField.get(this);
+			return Arrays.toString(newValues);
+		}
+		catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+}
\ No newline at end of file