You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/11/13 11:22:45 UTC

[1/2] incubator-flink git commit: [FLINK-1221] Use the source line as the default operator name

Repository: incubator-flink
Updated Branches:
  refs/heads/master b253cb2de -> 818ebda0f


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
new file mode 100644
index 0000000..921d9cc
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.operators;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOuputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.operators.translation.PlanFilterOperator;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Visitor;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Test proper automated assignment of the transformation's name, if not set by the user.
+ */
+public class NamesTest implements Serializable {
+	
+	@Test
+	public void testDefaultName() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<String> strs = env.fromCollection(Arrays.asList( new String[] {"a", "b"}));
+		
+		
+		// WARNING: The test will fail if this line is being moved down in the file (the line-number is hard-coded)
+		strs.filter(new FilterFunction<String>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public boolean filter(String value) throws Exception {
+				return value.equals("a");
+			}
+		}).output(new DiscardingOuputFormat<String>());
+		JavaPlan plan = env.createProgramPlan();
+		testForName("Filter at org.apache.flink.api.java.operators.NamesTest.testDefaultName(NamesTest.java:54)", plan);
+	}
+	
+	@Test
+	public void testGivenName() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<String> strs = env.fromCollection(Arrays.asList( new String[] {"a", "b"}));
+		strs.filter(new FilterFunction<String>() {
+			private static final long serialVersionUID = 1L;
+			@Override
+			public boolean filter(String value) throws Exception {
+				return value.equals("a");
+			}
+		}).name("GivenName").output(new DiscardingOuputFormat<String>());
+		JavaPlan plan = env.createProgramPlan();
+		testForName("GivenName", plan);
+	}
+	
+	@Test
+	public void testJoinWith() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		List<Tuple1<String>> strLi = new ArrayList<Tuple1<String>>();
+		strLi.add(new Tuple1<String>("a")); 
+		strLi.add(new Tuple1<String>("b"));
+		
+		DataSet<Tuple1<String>> strs = env.fromCollection(strLi);
+		DataSet<Tuple1<String>> strs1 = env.fromCollection(strLi);
+		strs.join(strs1).where(0).equalTo(0).with(new FlatJoinFunction<Tuple1<String>, Tuple1<String>, String>() {
+			@Override
+			public void join(Tuple1<String> first, Tuple1<String> second,
+					Collector<String> out) throws Exception {
+				//
+			}
+		})
+		.output(new DiscardingOuputFormat<String>());
+		JavaPlan plan = env.createProgramPlan();
+		plan.accept(new Visitor<Operator<?>>() {
+			@Override
+			public boolean preVisit(Operator<?> visitable) {
+				if(visitable instanceof JoinOperatorBase) {
+					Assert.assertEquals("Join at org.apache.flink.api.java.operators.NamesTest.testJoinWith(NamesTest.java:92)", visitable.getName());
+				}
+				return true;
+			}
+			@Override
+			public void postVisit(Operator<?> visitable) {}
+		});
+	}
+	
+	private static void testForName(final String expected, JavaPlan plan) {
+		plan.accept(new Visitor<Operator<?>>() {
+			@Override
+			public boolean preVisit(Operator<?> visitable) {
+				if(visitable instanceof PlanFilterOperator<?>) { 
+					// cast is actually not required. Its just a check for the right element
+					PlanFilterOperator<?> filterOp = (PlanFilterOperator<?>) visitable;
+					Assert.assertEquals(expected, filterOp.getName());
+				}
+				return true;
+			}
+			
+			@Override
+			public void postVisit(Operator<?> visitable) {
+				//
+			}
+		});
+	}
+	
+	/*public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSource<String> strs = env.fromParallelCollection(null, String.class);
+		strs.output(new DiscardingOuputFormat<String>());
+		JavaPlan plan = env.createProgramPlan();
+		plan.accept(new Visitor<Operator<?>>() {
+			@Override
+			public boolean preVisit(Operator<?> visitable) {
+				System.err.println("vis = "+visitable);
+				return true;
+			}
+			@Override
+			public void postVisit(Operator<?> visitable) {}
+		});
+	} */
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 4463eb9..ca8e469 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -243,7 +243,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     if (mapper == null) {
       throw new NullPointerException("Map function must not be null.")
     }
-    wrap(new MapOperator[T, R](javaSet, implicitly[TypeInformation[R]], mapper))
+    wrap(new MapOperator[T, R](javaSet,
+      implicitly[TypeInformation[R]],
+      mapper,
+      getCallLocationName()))
   }
 
   /**
@@ -256,7 +259,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     val mapper = new MapFunction[T, R] {
       def map(in: T): R = fun(in)
     }
-    wrap(new MapOperator[T, R](javaSet, implicitly[TypeInformation[R]], mapper))
+    wrap(new MapOperator[T, R](javaSet,
+      implicitly[TypeInformation[R]],
+      mapper,
+      getCallLocationName()))
   }
 
   /**
@@ -272,7 +278,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     if (partitionMapper == null) {
       throw new NullPointerException("MapPartition function must not be null.")
     }
-    wrap(new MapPartitionOperator[T, R](javaSet, implicitly[TypeInformation[R]], partitionMapper))
+    wrap(new MapPartitionOperator[T, R](javaSet,
+      implicitly[TypeInformation[R]],
+      partitionMapper,
+      getCallLocationName()))
   }
 
   /**
@@ -293,7 +302,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
         fun(in.iterator().asScala, out)
       }
     }
-    wrap(new MapPartitionOperator[T, R](javaSet, implicitly[TypeInformation[R]], partitionMapper))
+    wrap(new MapPartitionOperator[T, R](javaSet,
+      implicitly[TypeInformation[R]],
+      partitionMapper,
+      getCallLocationName()))
   }
 
   /**
@@ -314,7 +326,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
         fun(in.iterator().asScala) foreach out.collect
       }
     }
-    wrap(new MapPartitionOperator[T, R](javaSet, implicitly[TypeInformation[R]], partitionMapper))
+    wrap(new MapPartitionOperator[T, R](javaSet,
+      implicitly[TypeInformation[R]],
+      partitionMapper,
+      getCallLocationName()))
   }
 
   /**
@@ -325,7 +340,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     if (flatMapper == null) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
-    wrap(new FlatMapOperator[T, R](javaSet, implicitly[TypeInformation[R]], flatMapper))
+    wrap(new FlatMapOperator[T, R](javaSet,
+      implicitly[TypeInformation[R]],
+      flatMapper,
+      getCallLocationName()))
   }
 
   /**
@@ -339,7 +357,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     val flatMapper = new FlatMapFunction[T, R] {
       def flatMap(in: T, out: Collector[R]) { fun(in, out) }
     }
-    wrap(new FlatMapOperator[T, R](javaSet, implicitly[TypeInformation[R]], flatMapper))
+    wrap(new FlatMapOperator[T, R](javaSet,
+      implicitly[TypeInformation[R]],
+      flatMapper,
+      getCallLocationName()))
   }
 
   /**
@@ -353,7 +374,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     val flatMapper = new FlatMapFunction[T, R] {
       def flatMap(in: T, out: Collector[R]) { fun(in) foreach out.collect }
     }
-    wrap(new FlatMapOperator[T, R](javaSet, implicitly[TypeInformation[R]], flatMapper))
+    wrap(new FlatMapOperator[T, R](javaSet,
+      implicitly[TypeInformation[R]],
+      flatMapper,
+      getCallLocationName()))
   }
 
   /**
@@ -363,7 +387,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     if (filter == null) {
       throw new NullPointerException("Filter function must not be null.")
     }
-    wrap(new FilterOperator[T](javaSet, filter))
+    wrap(new FilterOperator[T](javaSet, filter, getCallLocationName()))
   }
 
   /**
@@ -376,7 +400,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     val filter = new FilterFunction[T] {
       def filter(in: T) = fun(in)
     }
-    wrap(new FilterOperator[T](javaSet, filter))
+    wrap(new FilterOperator[T](javaSet, filter, getCallLocationName()))
   }
 
   // --------------------------------------------------------------------------------------------
@@ -457,7 +481,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     if (reducer == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
-    wrap(new ReduceOperator[T](javaSet, reducer))
+    wrap(new ReduceOperator[T](javaSet, reducer, getCallLocationName()))
   }
 
   /**
@@ -471,7 +495,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     val reducer = new ReduceFunction[T] {
       def reduce(v1: T, v2: T) = { fun(v1, v2) }
     }
-    wrap(new ReduceOperator[T](javaSet, reducer))
+    wrap(new ReduceOperator[T](javaSet, reducer, getCallLocationName()))
   }
 
   /**
@@ -483,7 +507,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     if (reducer == null) {
       throw new NullPointerException("GroupReduce function must not be null.")
     }
-    wrap(new GroupReduceOperator[T, R](javaSet, implicitly[TypeInformation[R]], reducer))
+    wrap(new GroupReduceOperator[T, R](javaSet,
+      implicitly[TypeInformation[R]],
+      reducer,
+      getCallLocationName()))
   }
 
   /**
@@ -499,7 +526,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     val reducer = new GroupReduceFunction[T, R] {
       def reduce(in: java.lang.Iterable[T], out: Collector[R]) { fun(in.iterator().asScala, out) }
     }
-    wrap(new GroupReduceOperator[T, R](javaSet, implicitly[TypeInformation[R]], reducer))
+    wrap(new GroupReduceOperator[T, R](javaSet,
+      implicitly[TypeInformation[R]],
+      reducer,
+      getCallLocationName()))
   }
 
   /**
@@ -514,7 +544,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
         out.collect(fun(in.iterator().asScala))
       }
     }
-    wrap(new GroupReduceOperator[T, R](javaSet, implicitly[TypeInformation[R]], reducer))
+    wrap(new GroupReduceOperator[T, R](javaSet,
+      implicitly[TypeInformation[R]],
+      reducer,
+      getCallLocationName()))
   }
 
   /**
@@ -543,7 +576,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     wrap(new DistinctOperator[T](
       javaSet,
       new Keys.SelectorFunctionKeys[T, K](
-        keyExtractor, javaSet.getType, implicitly[TypeInformation[K]])))
+        keyExtractor, javaSet.getType, implicitly[TypeInformation[K]]),
+        getCallLocationName()))
   }
 
   /**
@@ -555,7 +589,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def distinct(fields: Int*): DataSet[T] = {
     wrap(new DistinctOperator[T](
       javaSet,
-      new Keys.ExpressionKeys[T](fields.toArray, javaSet.getType, true)))
+      new Keys.ExpressionKeys[T](fields.toArray, javaSet.getType, true),
+      getCallLocationName()))
   }
 
   /**
@@ -565,7 +600,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def distinct(firstField: String, otherFields: String*): DataSet[T] = {
     wrap(new DistinctOperator[T](
       javaSet,
-      new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType)))
+      new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType),
+      getCallLocationName()))
   }
 
   /**
@@ -575,7 +611,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * This only works if this DataSet contains Tuples.
    */
   def distinct: DataSet[T] = {
-    wrap(new DistinctOperator[T](javaSet, null))
+    wrap(new DistinctOperator[T](javaSet, null, getCallLocationName()))
   }
 
   // --------------------------------------------------------------------------------------------
@@ -915,7 +951,9 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * Creates a new DataSet containing the elements from both `this` DataSet and the `other`
    * DataSet.
    */
-  def union(other: DataSet[T]): DataSet[T] = wrap(new UnionOperator[T](javaSet, other.javaSet))
+  def union(other: DataSet[T]): DataSet[T] = wrap(new UnionOperator[T](javaSet,
+    other.javaSet,
+    getCallLocationName()))
 
   // --------------------------------------------------------------------------------------------
   //  Partitioning
@@ -931,7 +969,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     val op = new PartitionOperator[T](
       javaSet,
       PartitionMethod.HASH,
-      new Keys.ExpressionKeys[T](fields.toArray, javaSet.getType, false))
+      new Keys.ExpressionKeys[T](fields.toArray, javaSet.getType, false),
+      getCallLocationName())
     wrap(op)
   }
 
@@ -945,7 +984,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     val op = new PartitionOperator[T](
       javaSet,
       PartitionMethod.HASH,
-      new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType))
+      new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType),
+      getCallLocationName())
     wrap(op)
   }
 
@@ -965,7 +1005,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
       new Keys.SelectorFunctionKeys[T, K](
         keyExtractor,
         javaSet.getType,
-        implicitly[TypeInformation[K]]))
+        implicitly[TypeInformation[K]]),
+        getCallLocationName())
     wrap(op)
   }
 
@@ -981,7 +1022,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * @return The rebalanced DataSet.
    */
   def rebalance(): DataSet[T] = {
-    wrap(new PartitionOperator[T](javaSet, PartitionMethod.REBALANCE))
+    wrap(new PartitionOperator[T](javaSet, PartitionMethod.REBALANCE, getCallLocationName()))
   }
 
   // --------------------------------------------------------------------------------------------
@@ -1054,3 +1095,4 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     output(new PrintingOutputFormat[T](true))
   }
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index fd3e10d..ee92fd6 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -118,7 +118,8 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     Validate.notNull(filePath, "The file path may not be null.")
     val format = new TextInputFormat(new Path(filePath))
     format.setCharsetName(charsetName)
-    val source = new DataSource[String](javaEnv, format, BasicTypeInfo.STRING_TYPE_INFO)
+    val source = new DataSource[String](javaEnv, format, BasicTypeInfo.STRING_TYPE_INFO,
+      getCallLocationName())
     wrap(source)
   }
 
@@ -139,7 +140,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     val format = new TextValueInputFormat(new Path(filePath))
     format.setCharsetName(charsetName)
     val source = new DataSource[StringValue](
-      javaEnv, format, new ValueTypeInfo[StringValue](classOf[StringValue]))
+      javaEnv, format, new ValueTypeInfo[StringValue](classOf[StringValue]), getCallLocationName())
     wrap(source)
   }
 
@@ -186,7 +187,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
       inputFormat.setFieldTypes(classes)
     }
 
-    wrap(new DataSource[T](javaEnv, inputFormat, typeInfo))
+    wrap(new DataSource[T](javaEnv, inputFormat, typeInfo, getCallLocationName()))
   }
 
   /**
@@ -224,7 +225,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
       throw new IllegalArgumentException("InputFormat must not be null.")
     }
     Validate.notNull(producedType, "Produced type must not be null")
-    wrap(new DataSource[T](javaEnv, inputFormat, producedType))
+    wrap(new DataSource[T](javaEnv, inputFormat, producedType, getCallLocationName()))
   }
 
   /**
@@ -243,7 +244,8 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     val dataSource = new DataSource[T](
       javaEnv,
       new CollectionInputFormat[T](data.asJavaCollection, typeInfo.createSerializer),
-      typeInfo)
+      typeInfo,
+      getCallLocationName())
     wrap(dataSource)
   }
 
@@ -262,7 +264,8 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     val dataSource = new DataSource[T](
       javaEnv,
       new IteratorInputFormat[T](data.asJava),
-      typeInfo)
+      typeInfo,
+      getCallLocationName())
     wrap(dataSource)
   }
 
@@ -288,7 +291,10 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   def fromParallelCollection[T: ClassTag : TypeInformation](
       iterator: SplittableIterator[T]): DataSet[T] = {
     val typeInfo = implicitly[TypeInformation[T]]
-    wrap(new DataSource[T](javaEnv, new ParallelIteratorInputFormat[T](iterator), typeInfo))
+    wrap(new DataSource[T](javaEnv,
+      new ParallelIteratorInputFormat[T](iterator),
+      typeInfo,
+      getCallLocationName()))
   }
 
   /**
@@ -303,7 +309,8 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     val source = new DataSource(
       javaEnv,
       new ParallelIteratorInputFormat[java.lang.Long](iterator),
-      BasicTypeInfo.LONG_TYPE_INFO)
+      BasicTypeInfo.LONG_TYPE_INFO,
+      getCallLocationName())
     wrap(source).asInstanceOf[DataSet[Long]]
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
index 4cc84dd..23edc74 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
@@ -199,7 +199,7 @@ class GroupedDataSet[T: ClassTag](
         fun(v1, v2)
       }
     }
-    wrap(new ReduceOperator[T](createUnsortedGrouping(), reducer))
+    wrap(new ReduceOperator[T](createUnsortedGrouping(), reducer, getCallLocationName()))
   }
 
   /**
@@ -208,7 +208,7 @@ class GroupedDataSet[T: ClassTag](
    */
   def reduce(reducer: ReduceFunction[T]): DataSet[T] = {
     Validate.notNull(reducer, "Reduce function must not be null.")
-    wrap(new ReduceOperator[T](createUnsortedGrouping(), reducer))
+    wrap(new ReduceOperator[T](createUnsortedGrouping(), reducer, getCallLocationName()))
   }
 
   /**
@@ -226,7 +226,7 @@ class GroupedDataSet[T: ClassTag](
     }
     wrap(
       new GroupReduceOperator[T, R](maybeCreateSortedGrouping(),
-        implicitly[TypeInformation[R]], reducer))
+        implicitly[TypeInformation[R]], reducer, getCallLocationName()))
   }
 
   /**
@@ -244,7 +244,7 @@ class GroupedDataSet[T: ClassTag](
     }
     wrap(
       new GroupReduceOperator[T, R](maybeCreateSortedGrouping(),
-        implicitly[TypeInformation[R]], reducer))
+        implicitly[TypeInformation[R]], reducer, getCallLocationName()))
   }
 
   /**
@@ -256,7 +256,7 @@ class GroupedDataSet[T: ClassTag](
     Validate.notNull(reducer, "GroupReduce function must not be null.")
     wrap(
       new GroupReduceOperator[T, R](maybeCreateSortedGrouping(),
-        implicitly[TypeInformation[R]], reducer))
+        implicitly[TypeInformation[R]], reducer, getCallLocationName()))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
index 4574a9d..1fd9f9f 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
@@ -83,7 +83,8 @@ class CoGroupDataSet[L, R](
       leftKeys,
       rightKeys,
       coGrouper,
-      implicitly[TypeInformation[O]])
+      implicitly[TypeInformation[O]],
+      getCallLocationName())
 
     wrap(coGroupOperator)
   }
@@ -107,7 +108,8 @@ class CoGroupDataSet[L, R](
       leftKeys,
       rightKeys,
       coGrouper,
-      implicitly[TypeInformation[O]])
+      implicitly[TypeInformation[O]],
+      getCallLocationName())
 
     wrap(coGroupOperator)
   }
@@ -128,7 +130,8 @@ class CoGroupDataSet[L, R](
       leftKeys,
       rightKeys,
       coGrouper,
-      implicitly[TypeInformation[O]])
+      implicitly[TypeInformation[O]],
+      getCallLocationName())
 
     wrap(coGroupOperator)
   }
@@ -190,7 +193,8 @@ class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag](
       }
     }
     val coGroupOperator = new CoGroupOperator[L, R, (Array[L], Array[R])](
-      leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, returnType)
+      leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, returnType,
+      getCallLocationName())
 
     new CoGroupDataSet(coGroupOperator, leftInput, rightInput, leftKey, rightKey)
   }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala
index f67f10c..3e37a78 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala
@@ -67,7 +67,8 @@ class CrossDataSet[L, R](
       leftInput.javaSet,
       rightInput.javaSet,
       crosser,
-      implicitly[TypeInformation[O]])
+      implicitly[TypeInformation[O]],
+      getCallLocationName())
     wrap(crossOperator)
   }
 
@@ -85,7 +86,8 @@ class CrossDataSet[L, R](
       leftInput.javaSet,
       rightInput.javaSet,
       crosser,
-      implicitly[TypeInformation[O]])
+      implicitly[TypeInformation[O]],
+      getCallLocationName())
     wrap(crossOperator)
   }
 }
@@ -121,7 +123,8 @@ private[flink] object CrossDataSet {
       leftInput.javaSet,
       rightInput.javaSet,
       crosser,
-      returnType)
+      returnType,
+      getCallLocationName())
 
     new CrossDataSet(crossOperator, leftInput, rightInput)
   }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
index 36e4d36..7062c63 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
@@ -84,7 +84,8 @@ class JoinDataSet[L, R](
       rightKeys,
       joiner,
       implicitly[TypeInformation[O]],
-      defaultJoin.getJoinHint)
+      defaultJoin.getJoinHint,
+      getCallLocationName())
 
     wrap(joinOperator)
   }
@@ -108,7 +109,8 @@ class JoinDataSet[L, R](
       rightKeys,
       joiner,
       implicitly[TypeInformation[O]],
-      defaultJoin.getJoinHint)
+      defaultJoin.getJoinHint,
+      getCallLocationName())
 
     wrap(joinOperator)
   }
@@ -131,7 +133,8 @@ class JoinDataSet[L, R](
       rightKeys,
       joiner,
       implicitly[TypeInformation[O]],
-      defaultJoin.getJoinHint)
+      defaultJoin.getJoinHint,
+      getCallLocationName())
 
     wrap(joinOperator)
   }
@@ -155,7 +158,8 @@ class JoinDataSet[L, R](
       rightKeys,
       generatedFunction, fun,
       implicitly[TypeInformation[O]],
-      defaultJoin.getJoinHint)
+      defaultJoin.getJoinHint,
+      getCallLocationName())
 
     wrap(joinOperator)
   }
@@ -202,7 +206,8 @@ class UnfinishedJoinOperation[L, R](
       }
     }
     val joinOperator = new EquiJoin[L, R, (L, R)](
-      leftSet.javaSet, rightSet.javaSet, leftKey, rightKey, joiner, returnType, joinHint)
+      leftSet.javaSet, rightSet.javaSet, leftKey, rightKey, joiner, returnType, joinHint,
+        getCallLocationName())
 
     new JoinDataSet(joinOperator, leftSet, rightSet, leftKey, rightKey)
   }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
index 312e0dd..b390daf 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
@@ -63,4 +63,11 @@ package object scala {
           "supported on Case Classes (for now).")
     }
   }
+  def getCallLocationName(depth: Int = 3) : String = {
+    val st = Thread.currentThread().getStackTrace();
+    if(st.length < depth) {
+      return "<unknown>"
+    }
+    st(depth).toString
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
index 1aad4d1..b82f2ff 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
@@ -65,8 +65,10 @@ class ScalaAPICompletenessTest {
       "org.apache.flink.api.java.DataSet.minBy",
       "org.apache.flink.api.java.DataSet.maxBy",
       "org.apache.flink.api.java.operators.UnsortedGrouping.minBy",
-      "org.apache.flink.api.java.operators.UnsortedGrouping.maxBy"
-      
+      "org.apache.flink.api.java.operators.UnsortedGrouping.maxBy",
+
+      // This method is actually just an internal helper
+      "org.apache.flink.api.java.DataSet.getCallLocationName"
     )
     val excludedPatterns = Seq(
       // We don't have project on tuples in the Scala API


[2/2] incubator-flink git commit: [FLINK-1221] Use the source line as the default operator name

Posted by uc...@apache.org.
[FLINK-1221] Use the source line as the default operator name

This closes #197.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/818ebda0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/818ebda0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/818ebda0

Branch: refs/heads/master
Commit: 818ebda0f4d0070499446d2958532af5addc3a17
Parents: b253cb2
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Nov 11 17:30:09 2014 +0100
Committer: uce <uc...@apache.org>
Committed: Thu Nov 13 11:21:27 2014 +0100

----------------------------------------------------------------------
 .../flink/api/common/operators/Operator.java    |   4 +-
 .../flink/api/common/operators/Union.java       |   9 +-
 .../java/org/apache/flink/api/java/DataSet.java |  43 +++---
 .../flink/api/java/ExecutionEnvironment.java    |  33 ++--
 .../java/org/apache/flink/api/java/Utils.java   |  35 +++++
 .../org/apache/flink/api/java/io/CsvReader.java |  54 +++----
 .../api/java/operators/AggregateOperator.java   |  14 +-
 .../api/java/operators/CoGroupOperator.java     |  11 +-
 .../flink/api/java/operators/CrossOperator.java |  16 +-
 .../flink/api/java/operators/DataSource.java    |  15 +-
 .../api/java/operators/DistinctOperator.java    |   8 +-
 .../api/java/operators/FilterOperator.java      |  10 +-
 .../api/java/operators/FlatMapOperator.java     |   7 +-
 .../api/java/operators/GroupReduceOperator.java |  10 +-
 .../flink/api/java/operators/JoinOperator.java  |  24 +--
 .../flink/api/java/operators/MapOperator.java   |   8 +-
 .../java/operators/MapPartitionOperator.java    |   7 +-
 .../api/java/operators/PartitionOperator.java   |  10 +-
 .../api/java/operators/ReduceOperator.java      |  10 +-
 .../api/java/operators/SortedGrouping.java      |   3 +-
 .../flink/api/java/operators/UnionOperator.java |   8 +-
 .../api/java/operators/UnsortedGrouping.java    |  22 ++-
 .../flink/api/java/tuple/TupleGenerator.java    |  12 +-
 .../flink/api/java/operators/NamesTest.java     | 149 +++++++++++++++++++
 .../org/apache/flink/api/scala/DataSet.scala    |  90 ++++++++---
 .../flink/api/scala/ExecutionEnvironment.scala  |  23 ++-
 .../apache/flink/api/scala/GroupedDataSet.scala |  10 +-
 .../apache/flink/api/scala/coGroupDataSet.scala |  12 +-
 .../apache/flink/api/scala/crossDataSet.scala   |   9 +-
 .../apache/flink/api/scala/joinDataSet.scala    |  15 +-
 .../org/apache/flink/api/scala/package.scala    |   7 +
 .../api/scala/ScalaAPICompletenessTest.scala    |   6 +-
 32 files changed, 512 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
index 765aa73..85b352a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
@@ -244,7 +244,7 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
 		}
 
 		// Otherwise construct union cascade
-		Union<T> lastUnion = new Union<T>(new BinaryOperatorInformation<T, T, T>(type, type, type));
+		Union<T> lastUnion = new Union<T>(new BinaryOperatorInformation<T, T, T>(type, type, type), "<unknown>");
 
 		int i;
 		if (input2[0] == null) {
@@ -263,7 +263,7 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
 			i = 2;
 		}
 		for (; i < input2.length; i++) {
-			Union<T> tmpUnion = new Union<T>(new BinaryOperatorInformation<T, T, T>(type, type, type));
+			Union<T> tmpUnion = new Union<T>(new BinaryOperatorInformation<T, T, T>(type, type, type), "<unknown>");
 			tmpUnion.setSecondInput(lastUnion);
 			if (input2[i] == null) {
 				throw new IllegalArgumentException("The input may not contain null elements.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
index fb8626d..d7d0e20 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
@@ -30,19 +30,18 @@ import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
  */
 public class Union<T> extends DualInputOperator<T, T, T, AbstractRichFunction> {
 	
-	private final static String NAME = "Union";
 	
 	/** 
 	 * Creates a new Union operator.
 	 */
-	public Union(BinaryOperatorInformation<T, T, T> operatorInfo) {
+	public Union(BinaryOperatorInformation<T, T, T> operatorInfo, String unionLocationName) {
 		// we pass it an AbstractFunction, because currently all operators expect some form of UDF
-		super(new UserCodeClassWrapper<AbstractRichFunction>(AbstractRichFunction.class), operatorInfo, NAME);
+		super(new UserCodeClassWrapper<AbstractRichFunction>(AbstractRichFunction.class), operatorInfo, "Union at "+unionLocationName);
 	}
 	
-	public Union(Operator<T> input1, Operator<T> input2) {
+	public Union(Operator<T> input1, Operator<T> input2, String unionLocationName) {
 		this(new BinaryOperatorInformation<T, T, T>(input1.getOperatorInfo().getOutputType(),
-				input1.getOperatorInfo().getOutputType(), input1.getOperatorInfo().getOutputType()));
+				input1.getOperatorInfo().getOutputType(), input1.getOperatorInfo().getOutputType()), unionLocationName);
 		setFirstInput(input1);
 		setSecondInput(input2);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 7b0752c..c78cc7a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -148,7 +148,7 @@ public abstract class DataSet<T> {
 
 		TypeInformation<R> resultType = TypeExtractor.getMapReturnTypes(mapper, this.getType());
 
-		return new MapOperator<T, R>(this, resultType, mapper);
+		return new MapOperator<T, R>(this, resultType, mapper, Utils.getCallLocationName());
 	}
 
 
@@ -176,7 +176,7 @@ public abstract class DataSet<T> {
 			throw new NullPointerException("MapPartition function must not be null.");
 		}
 		TypeInformation<R> resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartition, this.getType());
-		return new MapPartitionOperator<T, R>(this, resultType, mapPartition);
+		return new MapPartitionOperator<T, R>(this, resultType, mapPartition, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -197,7 +197,7 @@ public abstract class DataSet<T> {
 		}
 
 		TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, this.getType());
-		return new FlatMapOperator<T, R>(this, resultType, flatMapper);
+		return new FlatMapOperator<T, R>(this, resultType, flatMapper, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -217,7 +217,7 @@ public abstract class DataSet<T> {
 		if (filter == null) {
 			throw new NullPointerException("Filter function must not be null.");
 		}
-		return new FilterOperator<T>(this, filter);
+		return new FilterOperator<T>(this, filter, Utils.getCallLocationName());
 	}
 
 	
@@ -267,7 +267,7 @@ public abstract class DataSet<T> {
 	 * @see DataSet
 	 */
 	public AggregateOperator<T> aggregate(Aggregations agg, int field) {
-		return new AggregateOperator<T>(this, agg, field);
+		return new AggregateOperator<T>(this, agg, field, Utils.getCallLocationName());
 	}
 
 	/**
@@ -320,7 +320,7 @@ public abstract class DataSet<T> {
 		if (reducer == null) {
 			throw new NullPointerException("Reduce function must not be null.");
 		}
-		return new ReduceOperator<T>(this, reducer);
+		return new ReduceOperator<T>(this, reducer, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -341,7 +341,7 @@ public abstract class DataSet<T> {
 			throw new NullPointerException("GroupReduce function must not be null.");
 		}
 		TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getType());
-		return new GroupReduceOperator<T, R>(this, resultType, reducer);
+		return new GroupReduceOperator<T, R>(this, resultType, reducer, Utils.getCallLocationName());
 	}
 
 /**
@@ -362,7 +362,7 @@ public abstract class DataSet<T> {
 		}
 			
 		return new ReduceOperator<T>(this, new SelectByMinFunction(
-				(TupleTypeInfo) this.type, fields));
+				(TupleTypeInfo) this.type, fields), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -383,7 +383,7 @@ public abstract class DataSet<T> {
 		}
 			
 		return new ReduceOperator<T>(this, new SelectByMaxFunction(
-				(TupleTypeInfo) this.type, fields));
+				(TupleTypeInfo) this.type, fields), Utils.getCallLocationName());
 	}
 
 	/**
@@ -415,7 +415,7 @@ public abstract class DataSet<T> {
 	 */
 	public <K> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor) {
 		TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
-		return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType(), keyType));
+		return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType(), keyType), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -430,7 +430,7 @@ public abstract class DataSet<T> {
 	 * @return A DistinctOperator that represents the distinct DataSet.
 	 */
 	public DistinctOperator<T> distinct(int... fields) {
-		return new DistinctOperator<T>(this, new Keys.ExpressionKeys<T>(fields, getType(), true));
+		return new DistinctOperator<T>(this, new Keys.ExpressionKeys<T>(fields, getType(), true), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -444,7 +444,7 @@ public abstract class DataSet<T> {
 	 * @return A DistinctOperator that represents the distinct DataSet.
 	 */
 	public DistinctOperator<T> distinct(String... fields) {
-		return new DistinctOperator<T>(this, new Keys.ExpressionKeys<T>(fields, getType()));
+		return new DistinctOperator<T>(this, new Keys.ExpressionKeys<T>(fields, getType()), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -455,7 +455,7 @@ public abstract class DataSet<T> {
 	 * @return A DistinctOperator that represents the distinct DataSet.
 	 */
 	public DistinctOperator<T> distinct() {
-		return new DistinctOperator<T>(this, null);
+		return new DistinctOperator<T>(this, null, Utils.getCallLocationName());
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -698,7 +698,7 @@ public abstract class DataSet<T> {
 	 * @see Tuple2
 	 */
 	public <R> CrossOperator.DefaultCross<T, R> cross(DataSet<R> other) {
-		return new CrossOperator.DefaultCross<T, R>(this, other);
+		return new CrossOperator.DefaultCross<T, R>(this, other, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -728,7 +728,7 @@ public abstract class DataSet<T> {
 	 * @see Tuple2
 	 */
 	public <R> CrossOperator.DefaultCross<T, R> crossWithTiny(DataSet<R> other) {
-		return new CrossOperator.DefaultCross<T, R>(this, other);
+		return new CrossOperator.DefaultCross<T, R>(this, other, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -758,7 +758,7 @@ public abstract class DataSet<T> {
 	 * @see Tuple2
 	 */
 	public <R> CrossOperator.DefaultCross<T, R> crossWithHuge(DataSet<R> other) {
-		return new CrossOperator.DefaultCross<T, R>(this, other);
+		return new CrossOperator.DefaultCross<T, R>(this, other, Utils.getCallLocationName());
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -879,7 +879,7 @@ public abstract class DataSet<T> {
 	 * @return The resulting DataSet.
 	 */
 	public UnionOperator<T> union(DataSet<T> other){
-		return new UnionOperator<T>(this, other);
+		return new UnionOperator<T>(this, other, Utils.getCallLocationName());
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -895,7 +895,7 @@ public abstract class DataSet<T> {
 	 * @return The partitioned DataSet.
 	 */
 	public PartitionOperator<T> partitionByHash(int... fields) {
-		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<T>(fields, getType(), false));
+		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<T>(fields, getType(), false), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -907,7 +907,7 @@ public abstract class DataSet<T> {
 	 * @return The partitioned DataSet.
 	 */
 	public PartitionOperator<T> partitionByHash(String... fields) {
-		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<T>(fields, getType()));
+		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<T>(fields, getType()), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -922,7 +922,7 @@ public abstract class DataSet<T> {
 	 */
 	public <K extends Comparable<K>> PartitionOperator<T> partitionByHash(KeySelector<T, K> keyExtractor) {
 		final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
-		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, this.getType(), keyType));
+		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, this.getType(), keyType), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -934,7 +934,7 @@ public abstract class DataSet<T> {
 	 * @return The rebalanced DataSet.
 	 */
 	public PartitionOperator<T> rebalance() {
-		return new PartitionOperator<T>(this, PartitionMethod.REBALANCE);
+		return new PartitionOperator<T>(this, PartitionMethod.REBALANCE, Utils.getCallLocationName());
 	}
 		
 	// --------------------------------------------------------------------------------------------
@@ -1174,4 +1174,5 @@ public abstract class DataSet<T> {
 			throw new IllegalArgumentException("The two inputs have different execution contexts.");
 		}
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 6b95ad8..541a89b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -207,7 +207,7 @@ public abstract class ExecutionEnvironment {
 	public DataSource<String> readTextFile(String filePath) {
 		Validate.notNull(filePath, "The file path may not be null.");
 		
-		return new DataSource<String>(this, new TextInputFormat(new Path(filePath)), BasicTypeInfo.STRING_TYPE_INFO );
+		return new DataSource<String>(this, new TextInputFormat(new Path(filePath)), BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -223,7 +223,7 @@ public abstract class ExecutionEnvironment {
 
 		TextInputFormat format = new TextInputFormat(new Path(filePath));
 		format.setCharsetName(charsetName);
-		return new DataSource<String>(this, format, BasicTypeInfo.STRING_TYPE_INFO );
+		return new DataSource<String>(this, format, BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
 	}
 	
 	// -------------------------- Text Input Format With String Value------------------------------
@@ -242,7 +242,7 @@ public abstract class ExecutionEnvironment {
 	public DataSource<StringValue> readTextFileWithValue(String filePath) {
 		Validate.notNull(filePath, "The file path may not be null.");
 		
-		return new DataSource<StringValue>(this, new TextValueInputFormat(new Path(filePath)), new ValueTypeInfo<StringValue>(StringValue.class) );
+		return new DataSource<StringValue>(this, new TextValueInputFormat(new Path(filePath)), new ValueTypeInfo<StringValue>(StringValue.class), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -265,7 +265,7 @@ public abstract class ExecutionEnvironment {
 		TextValueInputFormat format = new TextValueInputFormat(new Path(filePath));
 		format.setCharsetName(charsetName);
 		format.setSkipInvalidLines(skipInvalidLines);
-		return new DataSource<StringValue>(this, format, new ValueTypeInfo<StringValue>(StringValue.class) );
+		return new DataSource<StringValue>(this, format, new ValueTypeInfo<StringValue>(StringValue.class), Utils.getCallLocationName());
 	}
 	
 	// ----------------------------------- CSV Input Format ---------------------------------------
@@ -357,7 +357,7 @@ public abstract class ExecutionEnvironment {
 			throw new IllegalArgumentException("Produced type information must not be null.");
 		}
 		
-		return new DataSource<X>(this, inputFormat, producedType);
+		return new DataSource<X>(this, inputFormat, producedType, Utils.getCallLocationName());
 	}
 	
 	// ----------------------------------- Collection ---------------------------------------
@@ -390,7 +390,9 @@ public abstract class ExecutionEnvironment {
 		
 		X firstValue = data.iterator().next();
 		
-		return fromCollection(data, TypeExtractor.getForObject(firstValue));
+		TypeInformation<X> type = TypeExtractor.getForObject(firstValue);
+		CollectionInputFormat.checkCollection(data, type.getTypeClass());
+		return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer()), type, Utils.getCallLocationName(4));
 	}
 	
 	/**
@@ -411,9 +413,13 @@ public abstract class ExecutionEnvironment {
 	 * @see #fromCollection(Collection)
 	 */
 	public <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type) {
+		return fromCollection(data, type, Utils.getCallLocationName());
+	}
+	
+	private <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type, String callLocationName) {
 		CollectionInputFormat.checkCollection(data, type.getTypeClass());
 		
-		return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer()), type);
+		return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer()), type, callLocationName);
 	}
 	
 	/**
@@ -462,7 +468,7 @@ public abstract class ExecutionEnvironment {
 			throw new IllegalArgumentException("The iterator must be serializable.");
 		}
 		
-		return new DataSource<X>(this, new IteratorInputFormat<X>(data), type);
+		return new DataSource<X>(this, new IteratorInputFormat<X>(data), type, Utils.getCallLocationName());
 	}
 	
 	
@@ -490,7 +496,7 @@ public abstract class ExecutionEnvironment {
 			throw new IllegalArgumentException("The number of elements must not be zero.");
 		}
 		
-		return fromCollection(Arrays.asList(data), TypeExtractor.getForObject(data[0]));
+		return fromCollection(Arrays.asList(data), TypeExtractor.getForObject(data[0]), Utils.getCallLocationName());
 	}
 	
 	
@@ -532,7 +538,12 @@ public abstract class ExecutionEnvironment {
 	 * @see #fromParallelCollection(SplittableIterator, Class)
 	 */
 	public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type) {
-		return new DataSource<X>(this, new ParallelIteratorInputFormat<X>(iterator), type);
+		return fromParallelCollection(iterator, type, Utils.getCallLocationName(4));
+	}
+	
+	// private helper for passing different call location names
+	private <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type, String callLocationName) {
+		return new DataSource<X>(this, new ParallelIteratorInputFormat<X>(iterator), type, callLocationName);
 	}
 	
 	/**
@@ -544,7 +555,7 @@ public abstract class ExecutionEnvironment {
 	 * @return A DataSet, containing all number in the {@code [from, to]} interval.
 	 */
 	public DataSource<Long> generateSequence(long from, long to) {
-		return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO);
+		return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, Utils.getCallLocationName(3));
 	}	
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
new file mode 100644
index 0000000..462cf2c
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+
+public class Utils {
+
+	public static String getCallLocationName() {
+		return getCallLocationName(4);
+	}
+
+	public static String getCallLocationName(int depth) {
+		StackTraceElement[] st = Thread.currentThread().getStackTrace();
+		if(st.length < depth) { // we should not throw an out of bounds exception for this.
+			return "<unknown>";
+		}
+		return st[depth].toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index 9465ccc..5cd92e2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.commons.lang3.Validate;
-
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -236,7 +236,7 @@ public class CsvReader {
 		}
 		
 		configureInputFormat(inputFormat, classes);
-		return new DataSource<T>(executionContext, inputFormat, typeInfo);
+		return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -273,7 +273,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple1<T0>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0);
 		CsvInputFormat<Tuple1<T0>> inputFormat = new CsvInputFormat<Tuple1<T0>>(path);
 		configureInputFormat(inputFormat, type0);
-		return new DataSource<Tuple1<T0>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple1<T0>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -290,7 +290,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple2<T0, T1>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1);
 		CsvInputFormat<Tuple2<T0, T1>> inputFormat = new CsvInputFormat<Tuple2<T0, T1>>(path);
 		configureInputFormat(inputFormat, type0, type1);
-		return new DataSource<Tuple2<T0, T1>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple2<T0, T1>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -308,7 +308,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple3<T0, T1, T2>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2);
 		CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new CsvInputFormat<Tuple3<T0, T1, T2>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2);
-		return new DataSource<Tuple3<T0, T1, T2>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple3<T0, T1, T2>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -327,7 +327,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple4<T0, T1, T2, T3>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3);
 		CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new CsvInputFormat<Tuple4<T0, T1, T2, T3>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3);
-		return new DataSource<Tuple4<T0, T1, T2, T3>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple4<T0, T1, T2, T3>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -347,7 +347,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4);
 		CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4);
-		return new DataSource<Tuple5<T0, T1, T2, T3, T4>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple5<T0, T1, T2, T3, T4>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -368,7 +368,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5);
 		CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = new CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5);
-		return new DataSource<Tuple6<T0, T1, T2, T3, T4, T5>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple6<T0, T1, T2, T3, T4, T5>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -390,7 +390,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6);
 		CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat = new CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6);
-		return new DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -413,7 +413,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7);
 		CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> inputFormat = new CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7);
-		return new DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -437,7 +437,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8);
 		CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> inputFormat = new CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8);
-		return new DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -462,7 +462,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9);
 		CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> inputFormat = new CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9);
-		return new DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -488,7 +488,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10);
 		CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> inputFormat = new CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10);
-		return new DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -515,7 +515,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
 		CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> inputFormat = new CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
-		return new DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -543,7 +543,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12);
 		CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> inputFormat = new CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12);
-		return new DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -572,7 +572,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
 		CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> inputFormat = new CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
-		return new DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -602,7 +602,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14);
 		CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> inputFormat = new CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14);
-		return new DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -633,7 +633,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15);
 		CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> inputFormat = new CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15);
-		return new DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -665,7 +665,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16);
 		CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> inputFormat = new CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16);
-		return new DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -698,7 +698,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17);
 		CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17);
-		return new DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -732,7 +732,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18);
 		CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18);
-		return new DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -767,7 +767,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19);
 		CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19);
-		return new DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -803,7 +803,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20);
 		CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20);
-		return new DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -840,7 +840,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21);
 		CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21);
-		return new DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -878,7 +878,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22);
 		CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = new CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22);
-		return new DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -917,7 +917,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
 		CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> inputFormat = new CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
-		return new DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -957,7 +957,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24);
 		CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> inputFormat = new CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24);
-		return new DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	// END_OF_TUPLE_DEPENDENT_CODE

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
index 041dc75..e906232 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
@@ -54,15 +54,18 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
 	
 	private final Grouping<IN> grouping;
 	
+	private final String aggregateLocationName;
+	
 	/**
 	 * <p>
 	 * Non grouped aggregation
 	 */
-	public AggregateOperator(DataSet<IN> input, Aggregations function, int field) {
+	public AggregateOperator(DataSet<IN> input, Aggregations function, int field, String aggregateLocationName) {
 		super(Validate.notNull(input), input.getType());
-		
 		Validate.notNull(function);
 		
+		this.aggregateLocationName = aggregateLocationName;
+		
 		if (!input.getType().isTupleType()) {
 			throw new InvalidProgramException("Aggregating on field positions is only possible on tuple data types.");
 		}
@@ -90,11 +93,12 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
 	 * @param function
 	 * @param field
 	 */
-	public AggregateOperator(Grouping<IN> input, Aggregations function, int field) {
+	public AggregateOperator(Grouping<IN> input, Aggregations function, int field, String aggregateLocationName) {
 		super(Validate.notNull(input).getDataSet(), input.getDataSet().getType());
-		
 		Validate.notNull(function);
 		
+		this.aggregateLocationName = aggregateLocationName;
+		
 		if (!input.getDataSet().getType().isTupleType()) {
 			throw new InvalidProgramException("Aggregating on field positions is only possible on tuple data types.");
 		}
@@ -157,7 +161,6 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
 			throw new IllegalStateException();
 		}
 		
-		
 		// construct the aggregation function
 		AggregationFunction<Object>[] aggFunctions = new AggregationFunction[this.aggregationFunctions.size()];
 		int[] fields = new int[this.fields.size()];
@@ -169,6 +172,7 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
 			
 			genName.append(aggFunctions[i].toString()).append('(').append(fields[i]).append(')').append(',');
 		}
+		genName.append(" at ").append(aggregateLocationName);
 		genName.setLength(genName.length()-1);
 		
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 56f90f4..1034e86 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
@@ -59,16 +60,20 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 
 	private final Keys<I1> keys1;
 	private final Keys<I2> keys2;
+	
+	private final String defaultName;
 
 
 	public CoGroupOperator(DataSet<I1> input1, DataSet<I2> input2,
 							Keys<I1> keys1, Keys<I2> keys2,
 							CoGroupFunction<I1, I2, OUT> function,
-							TypeInformation<OUT> returnType)
+							TypeInformation<OUT> returnType,
+							String defaultName)
 	{
 		super(input1, input2, returnType);
 
 		this.function = function;
+		this.defaultName = defaultName;
 
 		if (keys1 == null || keys2 == null) {
 			throw new NullPointerException();
@@ -109,7 +114,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 	@Override
 	protected org.apache.flink.api.common.operators.base.CoGroupOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
 		
-		String name = getName() != null ? getName() : function.getClass().getName();
+		String name = getName() != null ? getName() : "CoGroup at "+defaultName;
 		try {
 			keys1.areCompatible(keys2);
 		} catch (IncompatibleKeysException e) {
@@ -519,7 +524,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 						throw new NullPointerException("CoGroup function must not be null.");
 					}
 					TypeInformation<R> returnType = TypeExtractor.getCoGroupReturnTypes(function, input1.getType(), input2.getType());
-					return new CoGroupOperator<I1, I2, R>(input1, input2, keys1, keys2, function, returnType);
+					return new CoGroupOperator<I1, I2, R>(input1, input2, keys1, keys2, function, returnType, Utils.getCallLocationName());
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index d0b5054..9aa1287 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.base.CrossOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -48,14 +49,17 @@ import org.apache.flink.api.java.tuple.*;
 public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT, CrossOperator<I1, I2, OUT>> {
 
 	private final CrossFunction<I1, I2, OUT> function;
+	private final String defaultName;
 
 	public CrossOperator(DataSet<I1> input1, DataSet<I2> input2,
 							CrossFunction<I1, I2, OUT> function,
-							TypeInformation<OUT> returnType)
+							TypeInformation<OUT> returnType,
+							String defaultName)
 	{
 		super(input1, input2, returnType);
 
 		this.function = function;
+		this.defaultName = defaultName;
 
 		if (!(function instanceof ProjectCrossFunction)) {
 			extractSemanticAnnotationsFromUdf(function.getClass());
@@ -72,7 +76,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 	@Override
 	protected org.apache.flink.api.common.operators.base.CrossOperatorBase<I1, I2, OUT, CrossFunction<I1,I2,OUT>> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
 		
-		String name = getName() != null ? getName() : function.getClass().getName();
+		String name = getName() != null ? getName() : "Cross at "+defaultName;
 		// create operator
 		CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>> po =
 				new CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>>(function, new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()), name);
@@ -106,9 +110,9 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 		private final DataSet<I1> input1;
 		private final DataSet<I2> input2;
 
-		public DefaultCross(DataSet<I1> input1, DataSet<I2> input2) {
+		public DefaultCross(DataSet<I1> input1, DataSet<I2> input2, String defaultName) {
 			super(input1, input2, (CrossFunction<I1, I2, Tuple2<I1, I2>>) new DefaultCrossFunction<I1, I2>(),
-					new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()));
+					new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), defaultName);
 
 			if (input1 == null || input2 == null) {
 				throw new NullPointerException();
@@ -133,7 +137,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 				throw new NullPointerException("Cross function must not be null.");
 			}
 			TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType());
-			return new CrossOperator<I1, I2, R>(input1, input2, function, returnType);
+			return new CrossOperator<I1, I2, R>(input1, input2, function, returnType, Utils.getCallLocationName());
 		}
 
 
@@ -207,7 +211,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 
 		protected ProjectCross(DataSet<I1> input1, DataSet<I2> input2, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType) {
 			super(input1, input2,
-				new ProjectCrossFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), returnType);
+				new ProjectCrossFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), returnType, "<unknown>");
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
index 764803f..2352269 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
@@ -34,8 +34,11 @@ import org.apache.flink.configuration.Configuration;
  * @param <OUT> The type of the elements produced by this data source.
  */
 public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
-	
+
 	private final InputFormat<OUT, ?> inputFormat;
+
+	private final String dataSourceLocationName;
+
 	private Configuration parameters;
 
 	// --------------------------------------------------------------------------------------------
@@ -47,9 +50,11 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
 	 * @param inputFormat The input format that the data source executes.
 	 * @param type The type of the elements produced by this input format.
 	 */
-	public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> type) {
+	public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> type, String dataSourceLocationName) {
 		super(context, type);
 		
+		this.dataSourceLocationName = dataSourceLocationName;
+		
 		if (inputFormat == null) {
 			throw new IllegalArgumentException("The input format may not be null.");
 		}
@@ -89,9 +94,9 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
 	// --------------------------------------------------------------------------------------------
 	
 	protected GenericDataSourceBase<OUT, ?> translateToDataFlow() {
-		String name = this.name != null ? this.name : this.inputFormat.toString();
-		if (name.length() > 100) {
-			name = name.substring(0, 100);
+		String name = this.name != null ? this.name : "at "+dataSourceLocationName+" ("+inputFormat.getClass().getName()+")";
+		if (name.length() > 150) {
+			name = name.substring(0, 150);
 		}
 		
 		@SuppressWarnings({ "unchecked", "rawtypes" })

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index 18fd756..126949c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -47,9 +47,13 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 	
 	private final Keys<T> keys;
 	
-	public DistinctOperator(DataSet<T> input, Keys<T> keys) {
+	private final String distinctLocationName;
+	
+	public DistinctOperator(DataSet<T> input, Keys<T> keys, String distinctLocationName) {
 		super(input, input.getType());
 		
+		this.distinctLocationName = distinctLocationName;
+		
 		// if keys is null distinction is done on all tuple fields
 		if (keys == null) {
 			if (input.getType().isTupleType()) {
@@ -80,7 +84,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 		
 		final RichGroupReduceFunction<T, T> function = new DistinctFunction<T>();
 
-		String name = function.getClass().getName();
+		String name = "Distinct at "+distinctLocationName;
 		
 		if (keys instanceof Keys.ExpressionKeys) {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
index ab8a6c5..1d93b0a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
@@ -33,18 +33,22 @@ import org.apache.flink.api.java.DataSet;
 public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperator<T>> {
 	
 	protected final FilterFunction<T> function;
+	
+	protected final String defaultName;
 
-	public FilterOperator(DataSet<T> input, FilterFunction<T> function) {
+	public FilterOperator(DataSet<T> input, FilterFunction<T> function, String defaultName) {
 		super(input, input.getType());
 		
 		this.function = function;
+		this.defaultName = defaultName;
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
 	
 	@Override
 	protected org.apache.flink.api.common.operators.base.FilterOperatorBase<T, FlatMapFunction<T,T>> translateToDataFlow(Operator<T> input) {
-
-		String name = getName() != null ? getName() : function.getClass().getName();
+		
+		String name = getName() != null ? getName() : "Filter at "+defaultName;
+		
 		// create operator
 		PlanFilterOperator<T> po = new PlanFilterOperator<T>(function, name, getInputType());
 		// set input

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
index b7e336f..9fa7cf1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
@@ -36,17 +36,20 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl
 	
 	protected final FlatMapFunction<IN, OUT> function;
 	
-	public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function) {
+	protected final String defaultName;
+	
+	public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function, String defaultName) {
 		super(input, resultType);
 		
 		this.function = function;
+		this.defaultName = defaultName;
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
 	
 	@Override
 	protected org.apache.flink.api.common.operators.base.FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
 		
-		String name = getName() != null ? getName() : function.getClass().getName();
+		String name = getName() != null ? getName() : "FlatMap at "+defaultName;
 		// create operator
 		FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
 		// set input

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index 1cd85c5..a040b14 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -47,6 +47,8 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	private final GroupReduceFunction<IN, OUT> function;
 
 	private final Grouping<IN> grouper;
+	
+	private final String defaultName;
 
 	private boolean combinable;
 
@@ -56,11 +58,12 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	 * @param input The input data set to the groupReduce function.
 	 * @param function The user-defined GroupReduce function.
 	 */
-	public GroupReduceOperator(DataSet<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function) {
+	public GroupReduceOperator(DataSet<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function, String defaultName) {
 		super(input, resultType);
 		
 		this.function = function;
 		this.grouper = null;
+		this.defaultName = defaultName;
 
 		checkCombinability();
 	}
@@ -71,11 +74,12 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	 * @param input The grouped input to be processed group-wise by the groupReduce function.
 	 * @param function The user-defined GroupReduce function.
 	 */
-	public GroupReduceOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function) {
+	public GroupReduceOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function, String defaultName) {
 		super(input != null ? input.getDataSet() : null, resultType);
 		
 		this.function = function;
 		this.grouper = input;
+		this.defaultName = defaultName;
 
 		checkCombinability();
 
@@ -110,7 +114,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	@Override
 	protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, OUT, ?> translateToDataFlow(Operator<IN> input) {
 
-		String name = getName() != null ? getName() : function.getClass().getName();
+		String name = getName() != null ? getName() : "GroupReduce at "+defaultName;
 		
 		// distinguish between grouped reduce and non-grouped reduce
 		if (grouper == null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 9be6656..93e0371 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
@@ -137,9 +138,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		@SuppressWarnings("unused")
 		private boolean preserve2;
 		
+		private final String joinLocationName;
+		
 		public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
 				Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> function,
-				TypeInformation<OUT> returnType, JoinHint hint)
+				TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName)
 		{
 			super(input1, input2, keys1, keys2, returnType, hint);
 			
@@ -148,6 +151,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			}
 			
 			this.function = function;
+			this.joinLocationName = joinLocationName;
 
 			if (!(function instanceof ProjectFlatJoinFunction)) {
 				extractSemanticAnnotationsFromUdf(function.getClass());
@@ -158,9 +162,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 		public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
 				Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> generatedFunction, JoinFunction<I1, I2, OUT> function,
-				TypeInformation<OUT> returnType, JoinHint hint)
+				TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName)
 		{
 			super(input1, input2, keys1, keys2, returnType, hint);
+			
+			this.joinLocationName = joinLocationName;
 
 			if (function == null) {
 				throw new NullPointerException();
@@ -204,7 +210,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				Operator<I1> input1,
 				Operator<I2> input2) {
 
-			String name = getName() != null ? getName() : function.getClass().getName();
+			String name = getName() != null ? getName() : "Join at "+joinLocationName;
 			try {
 				keys1.areCompatible(super.keys2);
 			} catch(IncompatibleKeysException ike) {
@@ -452,11 +458,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 	public static final class DefaultJoin<I1, I2> extends EquiJoin<I1, I2, Tuple2<I1, I2>> {
 
 		protected DefaultJoin(DataSet<I1> input1, DataSet<I2> input2, 
-				Keys<I1> keys1, Keys<I2> keys2, JoinHint hint)
+				Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, String joinLocationName)
 		{
 			super(input1, input2, keys1, keys2, 
 				(RichFlatJoinFunction<I1, I2, Tuple2<I1, I2>>) new DefaultFlatJoinFunction<I1, I2>(),
-				new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint);
+				new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint, joinLocationName);
 		}
 		
 		/**
@@ -475,7 +481,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				throw new NullPointerException("Join function must not be null.");
 			}
 			TypeInformation<R> returnType = TypeExtractor.getFlatJoinReturnTypes(function, getInput1Type(), getInput2Type());
-			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), function, returnType, getJoinHint());
+			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), function, returnType, getJoinHint(), Utils.getCallLocationName());
 		}
 
 		public <R> EquiJoin<I1, I2, R> with (JoinFunction<I1, I2, R> function) {
@@ -484,7 +490,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			}
 			FlatJoinFunction<I1, I2, R> generatedFunction = new WrappingFlatJoinFunction<I1, I2, R>(function);
 			TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
-			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint());
+			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint(), Utils.getCallLocationName());
 		}
 
 		public static class WrappingFlatJoinFunction<IN1, IN2, OUT> extends WrappingFunction<JoinFunction<IN1,IN2,OUT>> implements FlatJoinFunction<IN1, IN2, OUT> {
@@ -601,7 +607,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType) {
 			super(input1, input2, keys1, keys2, 
 					new ProjectFlatJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()),
-					returnType, hint);
+					returnType, hint, Utils.getCallLocationName(4)); // We need to use the 4th element in the stack because the call comes through .types().
 		}
 
 		@Override
@@ -850,7 +856,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 					throw new InvalidProgramException("The pair of join keys are not compatible with each other.",e);
 				}
 
-				return new DefaultJoin<I1, I2>(input1, input2, keys1, keys2, joinHint);
+				return new DefaultJoin<I1, I2>(input1, input2, keys1, keys2, joinHint, Utils.getCallLocationName(4));
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index f1ece2c..b4433dc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -37,11 +37,13 @@ import org.apache.flink.api.java.DataSet;
 public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOperator<IN, OUT>> {
 	
 	protected final MapFunction<IN, OUT> function;
+	
+	protected final String defaultName;
 
-	public MapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapFunction<IN, OUT> function) {
-
+	public MapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapFunction<IN, OUT> function, String defaultName) {
 		super(input, resultType);
 		
+		this.defaultName = defaultName;
 		this.function = function;
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
@@ -49,7 +51,7 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
 	@Override
 	protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
 		
-		String name = getName() != null ? getName() : function.getClass().getName();
+		String name = getName() != null ? getName() : "Map at "+defaultName;
 		// create operator
 		MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
 		// set input

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
index 839298b..067f7af 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
@@ -38,17 +38,20 @@ public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 	
 	protected final MapPartitionFunction<IN, OUT> function;
 	
-	public MapPartitionOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapPartitionFunction<IN, OUT> function) {
+	protected final String defaultName;
+	
+	public MapPartitionOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapPartitionFunction<IN, OUT> function, String defaultName) {
 		super(input, resultType);
 		
 		this.function = function;
+		this.defaultName = defaultName;
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
 	
 	@Override
 	protected MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
 		
-		String name = getName() != null ? getName() : function.getClass().getName();
+		String name = getName() != null ? getName() : "MapPartition at "+defaultName;
 		// create operator
 		MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> po = new MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
 		// set input

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index c4548fb..77d5681 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -41,9 +41,11 @@ public class PartitionOperator<T> extends SingleInputUdfOperator<T, T, Partition
 	
 	private final Keys<T> pKeys;
 	private final PartitionMethod pMethod;
+	private final String partitionLocationName;
 	
-	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys) {
+	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, String partitionLocationName) {
 		super(input, input.getType());
+		this.partitionLocationName = partitionLocationName;
 
 		if(pMethod == PartitionMethod.HASH && pKeys == null) {
 			throw new IllegalArgumentException("Hash Partitioning requires keys");
@@ -59,8 +61,8 @@ public class PartitionOperator<T> extends SingleInputUdfOperator<T, T, Partition
 		this.pKeys = pKeys;
 	}
 	
-	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod) {
-		this(input, pMethod, null);
+	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, String partitionLocationName) {
+		this(input, pMethod, null, partitionLocationName);
 	}
 	
 	/*
@@ -68,7 +70,7 @@ public class PartitionOperator<T> extends SingleInputUdfOperator<T, T, Partition
 	 */
 	protected org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> translateToDataFlow(Operator<T> input) {
 	
-		String name = "Partition";
+		String name = "Partition at "+partitionLocationName;
 		
 		// distinguish between partition types
 		if (pMethod == PartitionMethod.REBALANCE) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 8cb64ba..7089cf6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -46,6 +46,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 	
 	private final Grouping<IN> grouper;
 	
+	private final String defaultName;
+	
 	/**
 	 * 
 	 * This is the case for a reduce-all case (in contrast to the reduce-per-group case).
@@ -53,21 +55,23 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 	 * @param input
 	 * @param function
 	 */
-	public ReduceOperator(DataSet<IN> input, ReduceFunction<IN> function) {
+	public ReduceOperator(DataSet<IN> input, ReduceFunction<IN> function, String defaultName) {
 		super(input, input.getType());
 		
 		this.function = function;
 		this.grouper = null;
+		this.defaultName = defaultName;
 		
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
 	
 	
-	public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function) {
+	public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function, String defaultName) {
 		super(input.getDataSet(), input.getDataSet().getType());
 		
 		this.function = function;
 		this.grouper = input;
+		this.defaultName = defaultName;
 		
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
@@ -75,7 +79,7 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 	@Override
 	protected org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input) {
 		
-		String name = getName() != null ? getName() : function.getClass().getName();
+		String name = getName() != null ? getName() : "Reduce at "+defaultName;
 		
 		// distinguish between grouped reduce and non-grouped reduce
 		if (grouper == null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index c9700ce..36d14ee 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.FirstReducer;
 
 import java.util.Arrays;
@@ -110,7 +111,7 @@ public class SortedGrouping<T> extends Grouping<T> {
 		}
 		TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer,
 				this.getDataSet().getType());
-		return new GroupReduceOperator<T, R>(this, resultType, reducer);
+		return new GroupReduceOperator<T, R>(this, resultType, reducer, Utils.getCallLocationName() );
 	}
 
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
index efc1ebc..c6f72f2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Union;
-
 import org.apache.flink.api.java.DataSet;
 
 /**
@@ -31,14 +30,17 @@ import org.apache.flink.api.java.DataSet;
  */
 public class UnionOperator<T> extends TwoInputOperator<T, T, T, UnionOperator<T>> {
 
+	private final String unionLocationName;
 	/**
 	 * Create an operator that produces the union of the two given data sets.
 	 * 
 	 * @param input1 The first data set to be unioned.
 	 * @param input2 The second data set to be unioned.
 	 */
-	public UnionOperator(DataSet<T> input1, DataSet<T> input2) {
+	public UnionOperator(DataSet<T> input1, DataSet<T> input2, String unionLocationName) {
 		super(input1, input2, input1.getType());
+		
+		this.unionLocationName = unionLocationName;
 	}
 	
 	/**
@@ -50,6 +52,6 @@ public class UnionOperator<T> extends TwoInputOperator<T, T, T, UnionOperator<T>
 	 */
 	@Override
 	protected Union<T> translateToDataFlow(Operator<T> input1, Operator<T> input2) {
-		return new Union<T>(input1, input2);
+		return new Union<T>(input1, input2, unionLocationName);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 910846d..b504e37 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.functions.FirstReducer;
 import org.apache.flink.api.java.functions.SelectByMaxFunction;
@@ -58,7 +59,12 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 	 * @see DataSet
 	 */
 	public AggregateOperator<T> aggregate(Aggregations agg, int field) {
-		return new AggregateOperator<T>(this, agg, field);
+		return aggregate(agg, field, Utils.getCallLocationName());
+	}
+	
+	// private helper that allows to set a different call location name
+	private AggregateOperator<T> aggregate(Aggregations agg, int field, String callLocationName) {
+		return new AggregateOperator<T>(this, agg, field, callLocationName);
 	}
 
 	/**
@@ -69,7 +75,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 	 * @see org.apache.flink.api.java.operators.AggregateOperator
 	 */
 	public AggregateOperator<T> sum (int field) {
-		return this.aggregate (Aggregations.SUM, field);
+		return this.aggregate (Aggregations.SUM, field, Utils.getCallLocationName());
 	}
 
 	/**
@@ -80,7 +86,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 	 * @see org.apache.flink.api.java.operators.AggregateOperator
 	 */
 	public AggregateOperator<T> max (int field) {
-		return this.aggregate (Aggregations.MAX, field);
+		return this.aggregate (Aggregations.MAX, field, Utils.getCallLocationName());
 	}
 
 	/**
@@ -91,7 +97,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 	 * @see org.apache.flink.api.java.operators.AggregateOperator
 	 */
 	public AggregateOperator<T> min (int field) {
-		return this.aggregate (Aggregations.MIN, field);
+		return this.aggregate (Aggregations.MIN, field, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -111,7 +117,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 		if (reducer == null) {
 			throw new NullPointerException("Reduce function must not be null.");
 		}
-		return new ReduceOperator<T>(this, reducer);
+		return new ReduceOperator<T>(this, reducer, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -133,7 +139,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 		}
 		TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType());
 
-		return new GroupReduceOperator<T, R>(this, resultType, reducer);
+		return new GroupReduceOperator<T, R>(this, resultType, reducer, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -167,7 +173,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 		}
 			
 		return new ReduceOperator<T>(this, new SelectByMinFunction(
-				(TupleTypeInfo) this.dataSet.getType(), fields));
+				(TupleTypeInfo) this.dataSet.getType(), fields), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -188,7 +194,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 		}
 			
 		return new ReduceOperator<T>(this, new SelectByMaxFunction(
-				(TupleTypeInfo) this.dataSet.getType(), fields));
+				(TupleTypeInfo) this.dataSet.getType(), fields), Utils.getCallLocationName());
 	}
 	// --------------------------------------------------------------------------------------------
 	//  Group Operations

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index d439e79..de620f9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -78,7 +78,13 @@ class TupleGenerator {
 	private static final int LAST = 25;
 
 	public static void main(String[] args) throws Exception {
-		File root = new File(ROOT_DIRECTORY);
+		System.err.println("Current directory "+System.getProperty("user.dir"));
+		String rootDir = ROOT_DIRECTORY;
+		if(args.length > 0) {
+			rootDir = args[0] + "/" + ROOT_DIRECTORY;
+		}
+		System.err.println("Using root directory: "+rootDir);
+		File root = new File(rootDir);
 
 		createTupleClasses(root);
 
@@ -478,7 +484,7 @@ class TupleGenerator {
 			// return
 			sb.append("\t\treturn new DataSource<Tuple" + numFields + "<");
 			appendTupleTypeGenerics(sb, numFields);
-			sb.append(">>(executionContext, inputFormat, types);\n");
+			sb.append(">>(executionContext, inputFormat, types, DataSet.getCallLocationName());\n");
 
 			// end of method
 			sb.append("\t}\n");
@@ -834,7 +840,7 @@ class TupleGenerator {
 	}
 
 	private static String HEADER =
-		"/**\n"
+		"/*\n"
 		+ " * Licensed to the Apache Software Foundation (ASF) under one\n"
 		+ " * or more contributor license agreements.  See the NOTICE file\n"
 		+ " * distributed with this work for additional information\n"