You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/11/13 11:22:45 UTC
[1/2] incubator-flink git commit: [FLINK-1221] Use the source line as
the default operator name
Repository: incubator-flink
Updated Branches:
refs/heads/master b253cb2de -> 818ebda0f
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
new file mode 100644
index 0000000..921d9cc
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.operators;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOuputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.operators.translation.PlanFilterOperator;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Visitor;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Test proper automated assignment of the transformation's name, if not set by the user.
+ */
+public class NamesTest implements Serializable {
+
+ @Test
+ public void testDefaultName() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> strs = env.fromCollection(Arrays.asList( new String[] {"a", "b"}));
+
+
+ // WARNING: The test will fail if this line is being moved down in the file (the line-number is hard-coded)
+ strs.filter(new FilterFunction<String>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(String value) throws Exception {
+ return value.equals("a");
+ }
+ }).output(new DiscardingOuputFormat<String>());
+ JavaPlan plan = env.createProgramPlan();
+ testForName("Filter at org.apache.flink.api.java.operators.NamesTest.testDefaultName(NamesTest.java:54)", plan);
+ }
+
+ @Test
+ public void testGivenName() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> strs = env.fromCollection(Arrays.asList( new String[] {"a", "b"}));
+ strs.filter(new FilterFunction<String>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public boolean filter(String value) throws Exception {
+ return value.equals("a");
+ }
+ }).name("GivenName").output(new DiscardingOuputFormat<String>());
+ JavaPlan plan = env.createProgramPlan();
+ testForName("GivenName", plan);
+ }
+
+ @Test
+ public void testJoinWith() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ List<Tuple1<String>> strLi = new ArrayList<Tuple1<String>>();
+ strLi.add(new Tuple1<String>("a"));
+ strLi.add(new Tuple1<String>("b"));
+
+ DataSet<Tuple1<String>> strs = env.fromCollection(strLi);
+ DataSet<Tuple1<String>> strs1 = env.fromCollection(strLi);
+ strs.join(strs1).where(0).equalTo(0).with(new FlatJoinFunction<Tuple1<String>, Tuple1<String>, String>() {
+ @Override
+ public void join(Tuple1<String> first, Tuple1<String> second,
+ Collector<String> out) throws Exception {
+ //
+ }
+ })
+ .output(new DiscardingOuputFormat<String>());
+ JavaPlan plan = env.createProgramPlan();
+ plan.accept(new Visitor<Operator<?>>() {
+ @Override
+ public boolean preVisit(Operator<?> visitable) {
+ if(visitable instanceof JoinOperatorBase) {
+ Assert.assertEquals("Join at org.apache.flink.api.java.operators.NamesTest.testJoinWith(NamesTest.java:92)", visitable.getName());
+ }
+ return true;
+ }
+ @Override
+ public void postVisit(Operator<?> visitable) {}
+ });
+ }
+
+ private static void testForName(final String expected, JavaPlan plan) {
+ plan.accept(new Visitor<Operator<?>>() {
+ @Override
+ public boolean preVisit(Operator<?> visitable) {
+ if(visitable instanceof PlanFilterOperator<?>) {
+ // cast is actually not required. Its just a check for the right element
+ PlanFilterOperator<?> filterOp = (PlanFilterOperator<?>) visitable;
+ Assert.assertEquals(expected, filterOp.getName());
+ }
+ return true;
+ }
+
+ @Override
+ public void postVisit(Operator<?> visitable) {
+ //
+ }
+ });
+ }
+
+ /*public static void main(String[] args) throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSource<String> strs = env.fromParallelCollection(null, String.class);
+ strs.output(new DiscardingOuputFormat<String>());
+ JavaPlan plan = env.createProgramPlan();
+ plan.accept(new Visitor<Operator<?>>() {
+ @Override
+ public boolean preVisit(Operator<?> visitable) {
+ System.err.println("vis = "+visitable);
+ return true;
+ }
+ @Override
+ public void postVisit(Operator<?> visitable) {}
+ });
+ } */
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 4463eb9..ca8e469 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -243,7 +243,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
if (mapper == null) {
throw new NullPointerException("Map function must not be null.")
}
- wrap(new MapOperator[T, R](javaSet, implicitly[TypeInformation[R]], mapper))
+ wrap(new MapOperator[T, R](javaSet,
+ implicitly[TypeInformation[R]],
+ mapper,
+ getCallLocationName()))
}
/**
@@ -256,7 +259,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
val mapper = new MapFunction[T, R] {
def map(in: T): R = fun(in)
}
- wrap(new MapOperator[T, R](javaSet, implicitly[TypeInformation[R]], mapper))
+ wrap(new MapOperator[T, R](javaSet,
+ implicitly[TypeInformation[R]],
+ mapper,
+ getCallLocationName()))
}
/**
@@ -272,7 +278,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
if (partitionMapper == null) {
throw new NullPointerException("MapPartition function must not be null.")
}
- wrap(new MapPartitionOperator[T, R](javaSet, implicitly[TypeInformation[R]], partitionMapper))
+ wrap(new MapPartitionOperator[T, R](javaSet,
+ implicitly[TypeInformation[R]],
+ partitionMapper,
+ getCallLocationName()))
}
/**
@@ -293,7 +302,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
fun(in.iterator().asScala, out)
}
}
- wrap(new MapPartitionOperator[T, R](javaSet, implicitly[TypeInformation[R]], partitionMapper))
+ wrap(new MapPartitionOperator[T, R](javaSet,
+ implicitly[TypeInformation[R]],
+ partitionMapper,
+ getCallLocationName()))
}
/**
@@ -314,7 +326,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
fun(in.iterator().asScala) foreach out.collect
}
}
- wrap(new MapPartitionOperator[T, R](javaSet, implicitly[TypeInformation[R]], partitionMapper))
+ wrap(new MapPartitionOperator[T, R](javaSet,
+ implicitly[TypeInformation[R]],
+ partitionMapper,
+ getCallLocationName()))
}
/**
@@ -325,7 +340,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
if (flatMapper == null) {
throw new NullPointerException("FlatMap function must not be null.")
}
- wrap(new FlatMapOperator[T, R](javaSet, implicitly[TypeInformation[R]], flatMapper))
+ wrap(new FlatMapOperator[T, R](javaSet,
+ implicitly[TypeInformation[R]],
+ flatMapper,
+ getCallLocationName()))
}
/**
@@ -339,7 +357,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
val flatMapper = new FlatMapFunction[T, R] {
def flatMap(in: T, out: Collector[R]) { fun(in, out) }
}
- wrap(new FlatMapOperator[T, R](javaSet, implicitly[TypeInformation[R]], flatMapper))
+ wrap(new FlatMapOperator[T, R](javaSet,
+ implicitly[TypeInformation[R]],
+ flatMapper,
+ getCallLocationName()))
}
/**
@@ -353,7 +374,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
val flatMapper = new FlatMapFunction[T, R] {
def flatMap(in: T, out: Collector[R]) { fun(in) foreach out.collect }
}
- wrap(new FlatMapOperator[T, R](javaSet, implicitly[TypeInformation[R]], flatMapper))
+ wrap(new FlatMapOperator[T, R](javaSet,
+ implicitly[TypeInformation[R]],
+ flatMapper,
+ getCallLocationName()))
}
/**
@@ -363,7 +387,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
if (filter == null) {
throw new NullPointerException("Filter function must not be null.")
}
- wrap(new FilterOperator[T](javaSet, filter))
+ wrap(new FilterOperator[T](javaSet, filter, getCallLocationName()))
}
/**
@@ -376,7 +400,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
val filter = new FilterFunction[T] {
def filter(in: T) = fun(in)
}
- wrap(new FilterOperator[T](javaSet, filter))
+ wrap(new FilterOperator[T](javaSet, filter, getCallLocationName()))
}
// --------------------------------------------------------------------------------------------
@@ -457,7 +481,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
if (reducer == null) {
throw new NullPointerException("Reduce function must not be null.")
}
- wrap(new ReduceOperator[T](javaSet, reducer))
+ wrap(new ReduceOperator[T](javaSet, reducer, getCallLocationName()))
}
/**
@@ -471,7 +495,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
val reducer = new ReduceFunction[T] {
def reduce(v1: T, v2: T) = { fun(v1, v2) }
}
- wrap(new ReduceOperator[T](javaSet, reducer))
+ wrap(new ReduceOperator[T](javaSet, reducer, getCallLocationName()))
}
/**
@@ -483,7 +507,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
if (reducer == null) {
throw new NullPointerException("GroupReduce function must not be null.")
}
- wrap(new GroupReduceOperator[T, R](javaSet, implicitly[TypeInformation[R]], reducer))
+ wrap(new GroupReduceOperator[T, R](javaSet,
+ implicitly[TypeInformation[R]],
+ reducer,
+ getCallLocationName()))
}
/**
@@ -499,7 +526,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
val reducer = new GroupReduceFunction[T, R] {
def reduce(in: java.lang.Iterable[T], out: Collector[R]) { fun(in.iterator().asScala, out) }
}
- wrap(new GroupReduceOperator[T, R](javaSet, implicitly[TypeInformation[R]], reducer))
+ wrap(new GroupReduceOperator[T, R](javaSet,
+ implicitly[TypeInformation[R]],
+ reducer,
+ getCallLocationName()))
}
/**
@@ -514,7 +544,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
out.collect(fun(in.iterator().asScala))
}
}
- wrap(new GroupReduceOperator[T, R](javaSet, implicitly[TypeInformation[R]], reducer))
+ wrap(new GroupReduceOperator[T, R](javaSet,
+ implicitly[TypeInformation[R]],
+ reducer,
+ getCallLocationName()))
}
/**
@@ -543,7 +576,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
wrap(new DistinctOperator[T](
javaSet,
new Keys.SelectorFunctionKeys[T, K](
- keyExtractor, javaSet.getType, implicitly[TypeInformation[K]])))
+ keyExtractor, javaSet.getType, implicitly[TypeInformation[K]]),
+ getCallLocationName()))
}
/**
@@ -555,7 +589,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
def distinct(fields: Int*): DataSet[T] = {
wrap(new DistinctOperator[T](
javaSet,
- new Keys.ExpressionKeys[T](fields.toArray, javaSet.getType, true)))
+ new Keys.ExpressionKeys[T](fields.toArray, javaSet.getType, true),
+ getCallLocationName()))
}
/**
@@ -565,7 +600,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
def distinct(firstField: String, otherFields: String*): DataSet[T] = {
wrap(new DistinctOperator[T](
javaSet,
- new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType)))
+ new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType),
+ getCallLocationName()))
}
/**
@@ -575,7 +611,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* This only works if this DataSet contains Tuples.
*/
def distinct: DataSet[T] = {
- wrap(new DistinctOperator[T](javaSet, null))
+ wrap(new DistinctOperator[T](javaSet, null, getCallLocationName()))
}
// --------------------------------------------------------------------------------------------
@@ -915,7 +951,9 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* Creates a new DataSet containing the elements from both `this` DataSet and the `other`
* DataSet.
*/
- def union(other: DataSet[T]): DataSet[T] = wrap(new UnionOperator[T](javaSet, other.javaSet))
+ def union(other: DataSet[T]): DataSet[T] = wrap(new UnionOperator[T](javaSet,
+ other.javaSet,
+ getCallLocationName()))
// --------------------------------------------------------------------------------------------
// Partitioning
@@ -931,7 +969,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
val op = new PartitionOperator[T](
javaSet,
PartitionMethod.HASH,
- new Keys.ExpressionKeys[T](fields.toArray, javaSet.getType, false))
+ new Keys.ExpressionKeys[T](fields.toArray, javaSet.getType, false),
+ getCallLocationName())
wrap(op)
}
@@ -945,7 +984,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
val op = new PartitionOperator[T](
javaSet,
PartitionMethod.HASH,
- new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType))
+ new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType),
+ getCallLocationName())
wrap(op)
}
@@ -965,7 +1005,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
new Keys.SelectorFunctionKeys[T, K](
keyExtractor,
javaSet.getType,
- implicitly[TypeInformation[K]]))
+ implicitly[TypeInformation[K]]),
+ getCallLocationName())
wrap(op)
}
@@ -981,7 +1022,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* @return The rebalanced DataSet.
*/
def rebalance(): DataSet[T] = {
- wrap(new PartitionOperator[T](javaSet, PartitionMethod.REBALANCE))
+ wrap(new PartitionOperator[T](javaSet, PartitionMethod.REBALANCE, getCallLocationName()))
}
// --------------------------------------------------------------------------------------------
@@ -1054,3 +1095,4 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
output(new PrintingOutputFormat[T](true))
}
}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index fd3e10d..ee92fd6 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -118,7 +118,8 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
Validate.notNull(filePath, "The file path may not be null.")
val format = new TextInputFormat(new Path(filePath))
format.setCharsetName(charsetName)
- val source = new DataSource[String](javaEnv, format, BasicTypeInfo.STRING_TYPE_INFO)
+ val source = new DataSource[String](javaEnv, format, BasicTypeInfo.STRING_TYPE_INFO,
+ getCallLocationName())
wrap(source)
}
@@ -139,7 +140,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
val format = new TextValueInputFormat(new Path(filePath))
format.setCharsetName(charsetName)
val source = new DataSource[StringValue](
- javaEnv, format, new ValueTypeInfo[StringValue](classOf[StringValue]))
+ javaEnv, format, new ValueTypeInfo[StringValue](classOf[StringValue]), getCallLocationName())
wrap(source)
}
@@ -186,7 +187,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
inputFormat.setFieldTypes(classes)
}
- wrap(new DataSource[T](javaEnv, inputFormat, typeInfo))
+ wrap(new DataSource[T](javaEnv, inputFormat, typeInfo, getCallLocationName()))
}
/**
@@ -224,7 +225,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
throw new IllegalArgumentException("InputFormat must not be null.")
}
Validate.notNull(producedType, "Produced type must not be null")
- wrap(new DataSource[T](javaEnv, inputFormat, producedType))
+ wrap(new DataSource[T](javaEnv, inputFormat, producedType, getCallLocationName()))
}
/**
@@ -243,7 +244,8 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
val dataSource = new DataSource[T](
javaEnv,
new CollectionInputFormat[T](data.asJavaCollection, typeInfo.createSerializer),
- typeInfo)
+ typeInfo,
+ getCallLocationName())
wrap(dataSource)
}
@@ -262,7 +264,8 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
val dataSource = new DataSource[T](
javaEnv,
new IteratorInputFormat[T](data.asJava),
- typeInfo)
+ typeInfo,
+ getCallLocationName())
wrap(dataSource)
}
@@ -288,7 +291,10 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
def fromParallelCollection[T: ClassTag : TypeInformation](
iterator: SplittableIterator[T]): DataSet[T] = {
val typeInfo = implicitly[TypeInformation[T]]
- wrap(new DataSource[T](javaEnv, new ParallelIteratorInputFormat[T](iterator), typeInfo))
+ wrap(new DataSource[T](javaEnv,
+ new ParallelIteratorInputFormat[T](iterator),
+ typeInfo,
+ getCallLocationName()))
}
/**
@@ -303,7 +309,8 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
val source = new DataSource(
javaEnv,
new ParallelIteratorInputFormat[java.lang.Long](iterator),
- BasicTypeInfo.LONG_TYPE_INFO)
+ BasicTypeInfo.LONG_TYPE_INFO,
+ getCallLocationName())
wrap(source).asInstanceOf[DataSet[Long]]
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
index 4cc84dd..23edc74 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
@@ -199,7 +199,7 @@ class GroupedDataSet[T: ClassTag](
fun(v1, v2)
}
}
- wrap(new ReduceOperator[T](createUnsortedGrouping(), reducer))
+ wrap(new ReduceOperator[T](createUnsortedGrouping(), reducer, getCallLocationName()))
}
/**
@@ -208,7 +208,7 @@ class GroupedDataSet[T: ClassTag](
*/
def reduce(reducer: ReduceFunction[T]): DataSet[T] = {
Validate.notNull(reducer, "Reduce function must not be null.")
- wrap(new ReduceOperator[T](createUnsortedGrouping(), reducer))
+ wrap(new ReduceOperator[T](createUnsortedGrouping(), reducer, getCallLocationName()))
}
/**
@@ -226,7 +226,7 @@ class GroupedDataSet[T: ClassTag](
}
wrap(
new GroupReduceOperator[T, R](maybeCreateSortedGrouping(),
- implicitly[TypeInformation[R]], reducer))
+ implicitly[TypeInformation[R]], reducer, getCallLocationName()))
}
/**
@@ -244,7 +244,7 @@ class GroupedDataSet[T: ClassTag](
}
wrap(
new GroupReduceOperator[T, R](maybeCreateSortedGrouping(),
- implicitly[TypeInformation[R]], reducer))
+ implicitly[TypeInformation[R]], reducer, getCallLocationName()))
}
/**
@@ -256,7 +256,7 @@ class GroupedDataSet[T: ClassTag](
Validate.notNull(reducer, "GroupReduce function must not be null.")
wrap(
new GroupReduceOperator[T, R](maybeCreateSortedGrouping(),
- implicitly[TypeInformation[R]], reducer))
+ implicitly[TypeInformation[R]], reducer, getCallLocationName()))
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
index 4574a9d..1fd9f9f 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
@@ -83,7 +83,8 @@ class CoGroupDataSet[L, R](
leftKeys,
rightKeys,
coGrouper,
- implicitly[TypeInformation[O]])
+ implicitly[TypeInformation[O]],
+ getCallLocationName())
wrap(coGroupOperator)
}
@@ -107,7 +108,8 @@ class CoGroupDataSet[L, R](
leftKeys,
rightKeys,
coGrouper,
- implicitly[TypeInformation[O]])
+ implicitly[TypeInformation[O]],
+ getCallLocationName())
wrap(coGroupOperator)
}
@@ -128,7 +130,8 @@ class CoGroupDataSet[L, R](
leftKeys,
rightKeys,
coGrouper,
- implicitly[TypeInformation[O]])
+ implicitly[TypeInformation[O]],
+ getCallLocationName())
wrap(coGroupOperator)
}
@@ -190,7 +193,8 @@ class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag](
}
}
val coGroupOperator = new CoGroupOperator[L, R, (Array[L], Array[R])](
- leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, returnType)
+ leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, returnType,
+ getCallLocationName())
new CoGroupDataSet(coGroupOperator, leftInput, rightInput, leftKey, rightKey)
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala
index f67f10c..3e37a78 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala
@@ -67,7 +67,8 @@ class CrossDataSet[L, R](
leftInput.javaSet,
rightInput.javaSet,
crosser,
- implicitly[TypeInformation[O]])
+ implicitly[TypeInformation[O]],
+ getCallLocationName())
wrap(crossOperator)
}
@@ -85,7 +86,8 @@ class CrossDataSet[L, R](
leftInput.javaSet,
rightInput.javaSet,
crosser,
- implicitly[TypeInformation[O]])
+ implicitly[TypeInformation[O]],
+ getCallLocationName())
wrap(crossOperator)
}
}
@@ -121,7 +123,8 @@ private[flink] object CrossDataSet {
leftInput.javaSet,
rightInput.javaSet,
crosser,
- returnType)
+ returnType,
+ getCallLocationName())
new CrossDataSet(crossOperator, leftInput, rightInput)
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
index 36e4d36..7062c63 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
@@ -84,7 +84,8 @@ class JoinDataSet[L, R](
rightKeys,
joiner,
implicitly[TypeInformation[O]],
- defaultJoin.getJoinHint)
+ defaultJoin.getJoinHint,
+ getCallLocationName())
wrap(joinOperator)
}
@@ -108,7 +109,8 @@ class JoinDataSet[L, R](
rightKeys,
joiner,
implicitly[TypeInformation[O]],
- defaultJoin.getJoinHint)
+ defaultJoin.getJoinHint,
+ getCallLocationName())
wrap(joinOperator)
}
@@ -131,7 +133,8 @@ class JoinDataSet[L, R](
rightKeys,
joiner,
implicitly[TypeInformation[O]],
- defaultJoin.getJoinHint)
+ defaultJoin.getJoinHint,
+ getCallLocationName())
wrap(joinOperator)
}
@@ -155,7 +158,8 @@ class JoinDataSet[L, R](
rightKeys,
generatedFunction, fun,
implicitly[TypeInformation[O]],
- defaultJoin.getJoinHint)
+ defaultJoin.getJoinHint,
+ getCallLocationName())
wrap(joinOperator)
}
@@ -202,7 +206,8 @@ class UnfinishedJoinOperation[L, R](
}
}
val joinOperator = new EquiJoin[L, R, (L, R)](
- leftSet.javaSet, rightSet.javaSet, leftKey, rightKey, joiner, returnType, joinHint)
+ leftSet.javaSet, rightSet.javaSet, leftKey, rightKey, joiner, returnType, joinHint,
+ getCallLocationName())
new JoinDataSet(joinOperator, leftSet, rightSet, leftKey, rightKey)
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
index 312e0dd..b390daf 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
@@ -63,4 +63,11 @@ package object scala {
"supported on Case Classes (for now).")
}
}
+ def getCallLocationName(depth: Int = 3) : String = {
+ val st = Thread.currentThread().getStackTrace();
+ if(st.length < depth) {
+ return "<unknown>"
+ }
+ st(depth).toString
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
index 1aad4d1..b82f2ff 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
@@ -65,8 +65,10 @@ class ScalaAPICompletenessTest {
"org.apache.flink.api.java.DataSet.minBy",
"org.apache.flink.api.java.DataSet.maxBy",
"org.apache.flink.api.java.operators.UnsortedGrouping.minBy",
- "org.apache.flink.api.java.operators.UnsortedGrouping.maxBy"
-
+ "org.apache.flink.api.java.operators.UnsortedGrouping.maxBy",
+
+ // This method is actually just an internal helper
+ "org.apache.flink.api.java.DataSet.getCallLocationName"
)
val excludedPatterns = Seq(
// We don't have project on tuples in the Scala API
[2/2] incubator-flink git commit: [FLINK-1221] Use the source line as
the default operator name
Posted by uc...@apache.org.
[FLINK-1221] Use the source line as the default operator name
This closes #197.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/818ebda0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/818ebda0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/818ebda0
Branch: refs/heads/master
Commit: 818ebda0f4d0070499446d2958532af5addc3a17
Parents: b253cb2
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Nov 11 17:30:09 2014 +0100
Committer: uce <uc...@apache.org>
Committed: Thu Nov 13 11:21:27 2014 +0100
----------------------------------------------------------------------
.../flink/api/common/operators/Operator.java | 4 +-
.../flink/api/common/operators/Union.java | 9 +-
.../java/org/apache/flink/api/java/DataSet.java | 43 +++---
.../flink/api/java/ExecutionEnvironment.java | 33 ++--
.../java/org/apache/flink/api/java/Utils.java | 35 +++++
.../org/apache/flink/api/java/io/CsvReader.java | 54 +++----
.../api/java/operators/AggregateOperator.java | 14 +-
.../api/java/operators/CoGroupOperator.java | 11 +-
.../flink/api/java/operators/CrossOperator.java | 16 +-
.../flink/api/java/operators/DataSource.java | 15 +-
.../api/java/operators/DistinctOperator.java | 8 +-
.../api/java/operators/FilterOperator.java | 10 +-
.../api/java/operators/FlatMapOperator.java | 7 +-
.../api/java/operators/GroupReduceOperator.java | 10 +-
.../flink/api/java/operators/JoinOperator.java | 24 +--
.../flink/api/java/operators/MapOperator.java | 8 +-
.../java/operators/MapPartitionOperator.java | 7 +-
.../api/java/operators/PartitionOperator.java | 10 +-
.../api/java/operators/ReduceOperator.java | 10 +-
.../api/java/operators/SortedGrouping.java | 3 +-
.../flink/api/java/operators/UnionOperator.java | 8 +-
.../api/java/operators/UnsortedGrouping.java | 22 ++-
.../flink/api/java/tuple/TupleGenerator.java | 12 +-
.../flink/api/java/operators/NamesTest.java | 149 +++++++++++++++++++
.../org/apache/flink/api/scala/DataSet.scala | 90 ++++++++---
.../flink/api/scala/ExecutionEnvironment.scala | 23 ++-
.../apache/flink/api/scala/GroupedDataSet.scala | 10 +-
.../apache/flink/api/scala/coGroupDataSet.scala | 12 +-
.../apache/flink/api/scala/crossDataSet.scala | 9 +-
.../apache/flink/api/scala/joinDataSet.scala | 15 +-
.../org/apache/flink/api/scala/package.scala | 7 +
.../api/scala/ScalaAPICompletenessTest.scala | 6 +-
32 files changed, 512 insertions(+), 182 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
index 765aa73..85b352a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
@@ -244,7 +244,7 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
}
// Otherwise construct union cascade
- Union<T> lastUnion = new Union<T>(new BinaryOperatorInformation<T, T, T>(type, type, type));
+ Union<T> lastUnion = new Union<T>(new BinaryOperatorInformation<T, T, T>(type, type, type), "<unknown>");
int i;
if (input2[0] == null) {
@@ -263,7 +263,7 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
i = 2;
}
for (; i < input2.length; i++) {
- Union<T> tmpUnion = new Union<T>(new BinaryOperatorInformation<T, T, T>(type, type, type));
+ Union<T> tmpUnion = new Union<T>(new BinaryOperatorInformation<T, T, T>(type, type, type), "<unknown>");
tmpUnion.setSecondInput(lastUnion);
if (input2[i] == null) {
throw new IllegalArgumentException("The input may not contain null elements.");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
index fb8626d..d7d0e20 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
@@ -30,19 +30,18 @@ import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
*/
public class Union<T> extends DualInputOperator<T, T, T, AbstractRichFunction> {
- private final static String NAME = "Union";
/**
* Creates a new Union operator.
*/
- public Union(BinaryOperatorInformation<T, T, T> operatorInfo) {
+ public Union(BinaryOperatorInformation<T, T, T> operatorInfo, String unionLocationName) {
// we pass it an AbstractFunction, because currently all operators expect some form of UDF
- super(new UserCodeClassWrapper<AbstractRichFunction>(AbstractRichFunction.class), operatorInfo, NAME);
+ super(new UserCodeClassWrapper<AbstractRichFunction>(AbstractRichFunction.class), operatorInfo, "Union at "+unionLocationName);
}
- public Union(Operator<T> input1, Operator<T> input2) {
+ public Union(Operator<T> input1, Operator<T> input2, String unionLocationName) {
this(new BinaryOperatorInformation<T, T, T>(input1.getOperatorInfo().getOutputType(),
- input1.getOperatorInfo().getOutputType(), input1.getOperatorInfo().getOutputType()));
+ input1.getOperatorInfo().getOutputType(), input1.getOperatorInfo().getOutputType()), unionLocationName);
setFirstInput(input1);
setSecondInput(input2);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 7b0752c..c78cc7a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -148,7 +148,7 @@ public abstract class DataSet<T> {
TypeInformation<R> resultType = TypeExtractor.getMapReturnTypes(mapper, this.getType());
- return new MapOperator<T, R>(this, resultType, mapper);
+ return new MapOperator<T, R>(this, resultType, mapper, Utils.getCallLocationName());
}
@@ -176,7 +176,7 @@ public abstract class DataSet<T> {
throw new NullPointerException("MapPartition function must not be null.");
}
TypeInformation<R> resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartition, this.getType());
- return new MapPartitionOperator<T, R>(this, resultType, mapPartition);
+ return new MapPartitionOperator<T, R>(this, resultType, mapPartition, Utils.getCallLocationName());
}
/**
@@ -197,7 +197,7 @@ public abstract class DataSet<T> {
}
TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, this.getType());
- return new FlatMapOperator<T, R>(this, resultType, flatMapper);
+ return new FlatMapOperator<T, R>(this, resultType, flatMapper, Utils.getCallLocationName());
}
/**
@@ -217,7 +217,7 @@ public abstract class DataSet<T> {
if (filter == null) {
throw new NullPointerException("Filter function must not be null.");
}
- return new FilterOperator<T>(this, filter);
+ return new FilterOperator<T>(this, filter, Utils.getCallLocationName());
}
@@ -267,7 +267,7 @@ public abstract class DataSet<T> {
* @see DataSet
*/
public AggregateOperator<T> aggregate(Aggregations agg, int field) {
- return new AggregateOperator<T>(this, agg, field);
+ return new AggregateOperator<T>(this, agg, field, Utils.getCallLocationName());
}
/**
@@ -320,7 +320,7 @@ public abstract class DataSet<T> {
if (reducer == null) {
throw new NullPointerException("Reduce function must not be null.");
}
- return new ReduceOperator<T>(this, reducer);
+ return new ReduceOperator<T>(this, reducer, Utils.getCallLocationName());
}
/**
@@ -341,7 +341,7 @@ public abstract class DataSet<T> {
throw new NullPointerException("GroupReduce function must not be null.");
}
TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getType());
- return new GroupReduceOperator<T, R>(this, resultType, reducer);
+ return new GroupReduceOperator<T, R>(this, resultType, reducer, Utils.getCallLocationName());
}
/**
@@ -362,7 +362,7 @@ public abstract class DataSet<T> {
}
return new ReduceOperator<T>(this, new SelectByMinFunction(
- (TupleTypeInfo) this.type, fields));
+ (TupleTypeInfo) this.type, fields), Utils.getCallLocationName());
}
/**
@@ -383,7 +383,7 @@ public abstract class DataSet<T> {
}
return new ReduceOperator<T>(this, new SelectByMaxFunction(
- (TupleTypeInfo) this.type, fields));
+ (TupleTypeInfo) this.type, fields), Utils.getCallLocationName());
}
/**
@@ -415,7 +415,7 @@ public abstract class DataSet<T> {
*/
public <K> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor) {
TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
- return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType(), keyType));
+ return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType(), keyType), Utils.getCallLocationName());
}
/**
@@ -430,7 +430,7 @@ public abstract class DataSet<T> {
* @return A DistinctOperator that represents the distinct DataSet.
*/
public DistinctOperator<T> distinct(int... fields) {
- return new DistinctOperator<T>(this, new Keys.ExpressionKeys<T>(fields, getType(), true));
+ return new DistinctOperator<T>(this, new Keys.ExpressionKeys<T>(fields, getType(), true), Utils.getCallLocationName());
}
/**
@@ -444,7 +444,7 @@ public abstract class DataSet<T> {
* @return A DistinctOperator that represents the distinct DataSet.
*/
public DistinctOperator<T> distinct(String... fields) {
- return new DistinctOperator<T>(this, new Keys.ExpressionKeys<T>(fields, getType()));
+ return new DistinctOperator<T>(this, new Keys.ExpressionKeys<T>(fields, getType()), Utils.getCallLocationName());
}
/**
@@ -455,7 +455,7 @@ public abstract class DataSet<T> {
* @return A DistinctOperator that represents the distinct DataSet.
*/
public DistinctOperator<T> distinct() {
- return new DistinctOperator<T>(this, null);
+ return new DistinctOperator<T>(this, null, Utils.getCallLocationName());
}
// --------------------------------------------------------------------------------------------
@@ -698,7 +698,7 @@ public abstract class DataSet<T> {
* @see Tuple2
*/
public <R> CrossOperator.DefaultCross<T, R> cross(DataSet<R> other) {
- return new CrossOperator.DefaultCross<T, R>(this, other);
+ return new CrossOperator.DefaultCross<T, R>(this, other, Utils.getCallLocationName());
}
/**
@@ -728,7 +728,7 @@ public abstract class DataSet<T> {
* @see Tuple2
*/
public <R> CrossOperator.DefaultCross<T, R> crossWithTiny(DataSet<R> other) {
- return new CrossOperator.DefaultCross<T, R>(this, other);
+ return new CrossOperator.DefaultCross<T, R>(this, other, Utils.getCallLocationName());
}
/**
@@ -758,7 +758,7 @@ public abstract class DataSet<T> {
* @see Tuple2
*/
public <R> CrossOperator.DefaultCross<T, R> crossWithHuge(DataSet<R> other) {
- return new CrossOperator.DefaultCross<T, R>(this, other);
+ return new CrossOperator.DefaultCross<T, R>(this, other, Utils.getCallLocationName());
}
// --------------------------------------------------------------------------------------------
@@ -879,7 +879,7 @@ public abstract class DataSet<T> {
* @return The resulting DataSet.
*/
public UnionOperator<T> union(DataSet<T> other){
- return new UnionOperator<T>(this, other);
+ return new UnionOperator<T>(this, other, Utils.getCallLocationName());
}
// --------------------------------------------------------------------------------------------
@@ -895,7 +895,7 @@ public abstract class DataSet<T> {
* @return The partitioned DataSet.
*/
public PartitionOperator<T> partitionByHash(int... fields) {
- return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<T>(fields, getType(), false));
+ return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<T>(fields, getType(), false), Utils.getCallLocationName());
}
/**
@@ -907,7 +907,7 @@ public abstract class DataSet<T> {
* @return The partitioned DataSet.
*/
public PartitionOperator<T> partitionByHash(String... fields) {
- return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<T>(fields, getType()));
+ return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<T>(fields, getType()), Utils.getCallLocationName());
}
/**
@@ -922,7 +922,7 @@ public abstract class DataSet<T> {
*/
public <K extends Comparable<K>> PartitionOperator<T> partitionByHash(KeySelector<T, K> keyExtractor) {
final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
- return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, this.getType(), keyType));
+ return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, this.getType(), keyType), Utils.getCallLocationName());
}
/**
@@ -934,7 +934,7 @@ public abstract class DataSet<T> {
* @return The rebalanced DataSet.
*/
public PartitionOperator<T> rebalance() {
- return new PartitionOperator<T>(this, PartitionMethod.REBALANCE);
+ return new PartitionOperator<T>(this, PartitionMethod.REBALANCE, Utils.getCallLocationName());
}
// --------------------------------------------------------------------------------------------
@@ -1174,4 +1174,5 @@ public abstract class DataSet<T> {
throw new IllegalArgumentException("The two inputs have different execution contexts.");
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 6b95ad8..541a89b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -207,7 +207,7 @@ public abstract class ExecutionEnvironment {
public DataSource<String> readTextFile(String filePath) {
Validate.notNull(filePath, "The file path may not be null.");
- return new DataSource<String>(this, new TextInputFormat(new Path(filePath)), BasicTypeInfo.STRING_TYPE_INFO );
+ return new DataSource<String>(this, new TextInputFormat(new Path(filePath)), BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
}
/**
@@ -223,7 +223,7 @@ public abstract class ExecutionEnvironment {
TextInputFormat format = new TextInputFormat(new Path(filePath));
format.setCharsetName(charsetName);
- return new DataSource<String>(this, format, BasicTypeInfo.STRING_TYPE_INFO );
+ return new DataSource<String>(this, format, BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
}
// -------------------------- Text Input Format With String Value------------------------------
@@ -242,7 +242,7 @@ public abstract class ExecutionEnvironment {
public DataSource<StringValue> readTextFileWithValue(String filePath) {
Validate.notNull(filePath, "The file path may not be null.");
- return new DataSource<StringValue>(this, new TextValueInputFormat(new Path(filePath)), new ValueTypeInfo<StringValue>(StringValue.class) );
+ return new DataSource<StringValue>(this, new TextValueInputFormat(new Path(filePath)), new ValueTypeInfo<StringValue>(StringValue.class), Utils.getCallLocationName());
}
/**
@@ -265,7 +265,7 @@ public abstract class ExecutionEnvironment {
TextValueInputFormat format = new TextValueInputFormat(new Path(filePath));
format.setCharsetName(charsetName);
format.setSkipInvalidLines(skipInvalidLines);
- return new DataSource<StringValue>(this, format, new ValueTypeInfo<StringValue>(StringValue.class) );
+ return new DataSource<StringValue>(this, format, new ValueTypeInfo<StringValue>(StringValue.class), Utils.getCallLocationName());
}
// ----------------------------------- CSV Input Format ---------------------------------------
@@ -357,7 +357,7 @@ public abstract class ExecutionEnvironment {
throw new IllegalArgumentException("Produced type information must not be null.");
}
- return new DataSource<X>(this, inputFormat, producedType);
+ return new DataSource<X>(this, inputFormat, producedType, Utils.getCallLocationName());
}
// ----------------------------------- Collection ---------------------------------------
@@ -390,7 +390,9 @@ public abstract class ExecutionEnvironment {
X firstValue = data.iterator().next();
- return fromCollection(data, TypeExtractor.getForObject(firstValue));
+ TypeInformation<X> type = TypeExtractor.getForObject(firstValue);
+ CollectionInputFormat.checkCollection(data, type.getTypeClass());
+ return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer()), type, Utils.getCallLocationName(4));
}
/**
@@ -411,9 +413,13 @@ public abstract class ExecutionEnvironment {
* @see #fromCollection(Collection)
*/
public <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type) {
+ return fromCollection(data, type, Utils.getCallLocationName());
+ }
+
+ private <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type, String callLocationName) {
CollectionInputFormat.checkCollection(data, type.getTypeClass());
- return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer()), type);
+ return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer()), type, callLocationName);
}
/**
@@ -462,7 +468,7 @@ public abstract class ExecutionEnvironment {
throw new IllegalArgumentException("The iterator must be serializable.");
}
- return new DataSource<X>(this, new IteratorInputFormat<X>(data), type);
+ return new DataSource<X>(this, new IteratorInputFormat<X>(data), type, Utils.getCallLocationName());
}
@@ -490,7 +496,7 @@ public abstract class ExecutionEnvironment {
throw new IllegalArgumentException("The number of elements must not be zero.");
}
- return fromCollection(Arrays.asList(data), TypeExtractor.getForObject(data[0]));
+ return fromCollection(Arrays.asList(data), TypeExtractor.getForObject(data[0]), Utils.getCallLocationName());
}
@@ -532,7 +538,12 @@ public abstract class ExecutionEnvironment {
* @see #fromParallelCollection(SplittableIterator, Class)
*/
public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type) {
- return new DataSource<X>(this, new ParallelIteratorInputFormat<X>(iterator), type);
+ return fromParallelCollection(iterator, type, Utils.getCallLocationName(4));
+ }
+
+ // private helper for passing different call location names
+ private <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type, String callLocationName) {
+ return new DataSource<X>(this, new ParallelIteratorInputFormat<X>(iterator), type, callLocationName);
}
/**
@@ -544,7 +555,7 @@ public abstract class ExecutionEnvironment {
* @return A DataSet, containing all number in the {@code [from, to]} interval.
*/
public DataSource<Long> generateSequence(long from, long to) {
- return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO);
+ return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, Utils.getCallLocationName(3));
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
new file mode 100644
index 0000000..462cf2c
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+
+public class Utils {
+
+ public static String getCallLocationName() {
+ return getCallLocationName(4);
+ }
+
+ public static String getCallLocationName(int depth) {
+ StackTraceElement[] st = Thread.currentThread().getStackTrace();
+ if(st.length < depth) { // we should not throw an out of bounds exception for this.
+ return "<unknown>";
+ }
+ return st[depth].toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index 9465ccc..5cd92e2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import org.apache.commons.lang3.Validate;
-
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -236,7 +236,7 @@ public class CsvReader {
}
configureInputFormat(inputFormat, classes);
- return new DataSource<T>(executionContext, inputFormat, typeInfo);
+ return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
}
// --------------------------------------------------------------------------------------------
@@ -273,7 +273,7 @@ public class CsvReader {
TupleTypeInfo<Tuple1<T0>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0);
CsvInputFormat<Tuple1<T0>> inputFormat = new CsvInputFormat<Tuple1<T0>>(path);
configureInputFormat(inputFormat, type0);
- return new DataSource<Tuple1<T0>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple1<T0>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -290,7 +290,7 @@ public class CsvReader {
TupleTypeInfo<Tuple2<T0, T1>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1);
CsvInputFormat<Tuple2<T0, T1>> inputFormat = new CsvInputFormat<Tuple2<T0, T1>>(path);
configureInputFormat(inputFormat, type0, type1);
- return new DataSource<Tuple2<T0, T1>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple2<T0, T1>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -308,7 +308,7 @@ public class CsvReader {
TupleTypeInfo<Tuple3<T0, T1, T2>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2);
CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new CsvInputFormat<Tuple3<T0, T1, T2>>(path);
configureInputFormat(inputFormat, type0, type1, type2);
- return new DataSource<Tuple3<T0, T1, T2>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple3<T0, T1, T2>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -327,7 +327,7 @@ public class CsvReader {
TupleTypeInfo<Tuple4<T0, T1, T2, T3>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3);
CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new CsvInputFormat<Tuple4<T0, T1, T2, T3>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3);
- return new DataSource<Tuple4<T0, T1, T2, T3>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple4<T0, T1, T2, T3>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -347,7 +347,7 @@ public class CsvReader {
TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4);
CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4);
- return new DataSource<Tuple5<T0, T1, T2, T3, T4>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple5<T0, T1, T2, T3, T4>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -368,7 +368,7 @@ public class CsvReader {
TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5);
CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = new CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5);
- return new DataSource<Tuple6<T0, T1, T2, T3, T4, T5>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple6<T0, T1, T2, T3, T4, T5>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -390,7 +390,7 @@ public class CsvReader {
TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6);
CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat = new CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6);
- return new DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -413,7 +413,7 @@ public class CsvReader {
TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7);
CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> inputFormat = new CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7);
- return new DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -437,7 +437,7 @@ public class CsvReader {
TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8);
CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> inputFormat = new CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8);
- return new DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -462,7 +462,7 @@ public class CsvReader {
TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9);
CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> inputFormat = new CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9);
- return new DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -488,7 +488,7 @@ public class CsvReader {
TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10);
CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> inputFormat = new CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10);
- return new DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -515,7 +515,7 @@ public class CsvReader {
TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> inputFormat = new CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
- return new DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -543,7 +543,7 @@ public class CsvReader {
TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12);
CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> inputFormat = new CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12);
- return new DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -572,7 +572,7 @@ public class CsvReader {
TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> inputFormat = new CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
- return new DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -602,7 +602,7 @@ public class CsvReader {
TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14);
CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> inputFormat = new CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14);
- return new DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -633,7 +633,7 @@ public class CsvReader {
TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15);
CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> inputFormat = new CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15);
- return new DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -665,7 +665,7 @@ public class CsvReader {
TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16);
CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> inputFormat = new CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16);
- return new DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -698,7 +698,7 @@ public class CsvReader {
TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17);
CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17);
- return new DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -732,7 +732,7 @@ public class CsvReader {
TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18);
CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18);
- return new DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -767,7 +767,7 @@ public class CsvReader {
TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19);
CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19);
- return new DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -803,7 +803,7 @@ public class CsvReader {
TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20);
CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20);
- return new DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -840,7 +840,7 @@ public class CsvReader {
TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21);
CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21);
- return new DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -878,7 +878,7 @@ public class CsvReader {
TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22);
CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = new CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22);
- return new DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -917,7 +917,7 @@ public class CsvReader {
TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> inputFormat = new CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
- return new DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
/**
@@ -957,7 +957,7 @@ public class CsvReader {
TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24);
CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> inputFormat = new CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(path);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24);
- return new DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(executionContext, inputFormat, types);
+ return new DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
// END_OF_TUPLE_DEPENDENT_CODE
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
index 041dc75..e906232 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
@@ -54,15 +54,18 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
private final Grouping<IN> grouping;
+ private final String aggregateLocationName;
+
/**
* <p>
* Non grouped aggregation
*/
- public AggregateOperator(DataSet<IN> input, Aggregations function, int field) {
+ public AggregateOperator(DataSet<IN> input, Aggregations function, int field, String aggregateLocationName) {
super(Validate.notNull(input), input.getType());
-
Validate.notNull(function);
+ this.aggregateLocationName = aggregateLocationName;
+
if (!input.getType().isTupleType()) {
throw new InvalidProgramException("Aggregating on field positions is only possible on tuple data types.");
}
@@ -90,11 +93,12 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
* @param function
* @param field
*/
- public AggregateOperator(Grouping<IN> input, Aggregations function, int field) {
+ public AggregateOperator(Grouping<IN> input, Aggregations function, int field, String aggregateLocationName) {
super(Validate.notNull(input).getDataSet(), input.getDataSet().getType());
-
Validate.notNull(function);
+ this.aggregateLocationName = aggregateLocationName;
+
if (!input.getDataSet().getType().isTupleType()) {
throw new InvalidProgramException("Aggregating on field positions is only possible on tuple data types.");
}
@@ -157,7 +161,6 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
throw new IllegalStateException();
}
-
// construct the aggregation function
AggregationFunction<Object>[] aggFunctions = new AggregationFunction[this.aggregationFunctions.size()];
int[] fields = new int[this.fields.size()];
@@ -169,6 +172,7 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
genName.append(aggFunctions[i].toString()).append('(').append(fields[i]).append(')').append(',');
}
+ genName.append(" at ").append(aggregateLocationName);
genName.setLength(genName.length()-1);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 56f90f4..1034e86 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
@@ -59,16 +60,20 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
private final Keys<I1> keys1;
private final Keys<I2> keys2;
+
+ private final String defaultName;
public CoGroupOperator(DataSet<I1> input1, DataSet<I2> input2,
Keys<I1> keys1, Keys<I2> keys2,
CoGroupFunction<I1, I2, OUT> function,
- TypeInformation<OUT> returnType)
+ TypeInformation<OUT> returnType,
+ String defaultName)
{
super(input1, input2, returnType);
this.function = function;
+ this.defaultName = defaultName;
if (keys1 == null || keys2 == null) {
throw new NullPointerException();
@@ -109,7 +114,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
@Override
protected org.apache.flink.api.common.operators.base.CoGroupOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
- String name = getName() != null ? getName() : function.getClass().getName();
+ String name = getName() != null ? getName() : "CoGroup at "+defaultName;
try {
keys1.areCompatible(keys2);
} catch (IncompatibleKeysException e) {
@@ -519,7 +524,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
throw new NullPointerException("CoGroup function must not be null.");
}
TypeInformation<R> returnType = TypeExtractor.getCoGroupReturnTypes(function, input1.getType(), input2.getType());
- return new CoGroupOperator<I1, I2, R>(input1, input2, keys1, keys2, function, returnType);
+ return new CoGroupOperator<I1, I2, R>(input1, input2, keys1, keys2, function, returnType, Utils.getCallLocationName());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index d0b5054..9aa1287 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.base.CrossOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.SemanticPropUtil;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -48,14 +49,17 @@ import org.apache.flink.api.java.tuple.*;
public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT, CrossOperator<I1, I2, OUT>> {
private final CrossFunction<I1, I2, OUT> function;
+ private final String defaultName;
public CrossOperator(DataSet<I1> input1, DataSet<I2> input2,
CrossFunction<I1, I2, OUT> function,
- TypeInformation<OUT> returnType)
+ TypeInformation<OUT> returnType,
+ String defaultName)
{
super(input1, input2, returnType);
this.function = function;
+ this.defaultName = defaultName;
if (!(function instanceof ProjectCrossFunction)) {
extractSemanticAnnotationsFromUdf(function.getClass());
@@ -72,7 +76,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
@Override
protected org.apache.flink.api.common.operators.base.CrossOperatorBase<I1, I2, OUT, CrossFunction<I1,I2,OUT>> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
- String name = getName() != null ? getName() : function.getClass().getName();
+ String name = getName() != null ? getName() : "Cross at "+defaultName;
// create operator
CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>> po =
new CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>>(function, new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()), name);
@@ -106,9 +110,9 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
private final DataSet<I1> input1;
private final DataSet<I2> input2;
- public DefaultCross(DataSet<I1> input1, DataSet<I2> input2) {
+ public DefaultCross(DataSet<I1> input1, DataSet<I2> input2, String defaultName) {
super(input1, input2, (CrossFunction<I1, I2, Tuple2<I1, I2>>) new DefaultCrossFunction<I1, I2>(),
- new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()));
+ new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), defaultName);
if (input1 == null || input2 == null) {
throw new NullPointerException();
@@ -133,7 +137,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
throw new NullPointerException("Cross function must not be null.");
}
TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType());
- return new CrossOperator<I1, I2, R>(input1, input2, function, returnType);
+ return new CrossOperator<I1, I2, R>(input1, input2, function, returnType, Utils.getCallLocationName());
}
@@ -207,7 +211,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
protected ProjectCross(DataSet<I1> input1, DataSet<I2> input2, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType) {
super(input1, input2,
- new ProjectCrossFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), returnType);
+ new ProjectCrossFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), returnType, "<unknown>");
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
index 764803f..2352269 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
@@ -34,8 +34,11 @@ import org.apache.flink.configuration.Configuration;
* @param <OUT> The type of the elements produced by this data source.
*/
public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
-
+
private final InputFormat<OUT, ?> inputFormat;
+
+ private final String dataSourceLocationName;
+
private Configuration parameters;
// --------------------------------------------------------------------------------------------
@@ -47,9 +50,11 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
* @param inputFormat The input format that the data source executes.
* @param type The type of the elements produced by this input format.
*/
- public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> type) {
+ public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> type, String dataSourceLocationName) {
super(context, type);
+ this.dataSourceLocationName = dataSourceLocationName;
+
if (inputFormat == null) {
throw new IllegalArgumentException("The input format may not be null.");
}
@@ -89,9 +94,9 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
// --------------------------------------------------------------------------------------------
protected GenericDataSourceBase<OUT, ?> translateToDataFlow() {
- String name = this.name != null ? this.name : this.inputFormat.toString();
- if (name.length() > 100) {
- name = name.substring(0, 100);
+ String name = this.name != null ? this.name : "at "+dataSourceLocationName+" ("+inputFormat.getClass().getName()+")";
+ if (name.length() > 150) {
+ name = name.substring(0, 150);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index 18fd756..126949c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -47,9 +47,13 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
private final Keys<T> keys;
- public DistinctOperator(DataSet<T> input, Keys<T> keys) {
+ private final String distinctLocationName;
+
+ public DistinctOperator(DataSet<T> input, Keys<T> keys, String distinctLocationName) {
super(input, input.getType());
+ this.distinctLocationName = distinctLocationName;
+
// if keys is null distinction is done on all tuple fields
if (keys == null) {
if (input.getType().isTupleType()) {
@@ -80,7 +84,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
final RichGroupReduceFunction<T, T> function = new DistinctFunction<T>();
- String name = function.getClass().getName();
+ String name = "Distinct at "+distinctLocationName;
if (keys instanceof Keys.ExpressionKeys) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
index ab8a6c5..1d93b0a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
@@ -33,18 +33,22 @@ import org.apache.flink.api.java.DataSet;
public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperator<T>> {
protected final FilterFunction<T> function;
+
+ protected final String defaultName;
- public FilterOperator(DataSet<T> input, FilterFunction<T> function) {
+ public FilterOperator(DataSet<T> input, FilterFunction<T> function, String defaultName) {
super(input, input.getType());
this.function = function;
+ this.defaultName = defaultName;
extractSemanticAnnotationsFromUdf(function.getClass());
}
@Override
protected org.apache.flink.api.common.operators.base.FilterOperatorBase<T, FlatMapFunction<T,T>> translateToDataFlow(Operator<T> input) {
-
- String name = getName() != null ? getName() : function.getClass().getName();
+
+ String name = getName() != null ? getName() : "Filter at "+defaultName;
+
// create operator
PlanFilterOperator<T> po = new PlanFilterOperator<T>(function, name, getInputType());
// set input
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
index b7e336f..9fa7cf1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
@@ -36,17 +36,20 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl
protected final FlatMapFunction<IN, OUT> function;
- public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function) {
+ protected final String defaultName;
+
+ public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function, String defaultName) {
super(input, resultType);
this.function = function;
+ this.defaultName = defaultName;
extractSemanticAnnotationsFromUdf(function.getClass());
}
@Override
protected org.apache.flink.api.common.operators.base.FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
- String name = getName() != null ? getName() : function.getClass().getName();
+ String name = getName() != null ? getName() : "FlatMap at "+defaultName;
// create operator
FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
// set input
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index 1cd85c5..a040b14 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -47,6 +47,8 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
private final GroupReduceFunction<IN, OUT> function;
private final Grouping<IN> grouper;
+
+ private final String defaultName;
private boolean combinable;
@@ -56,11 +58,12 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
* @param input The input data set to the groupReduce function.
* @param function The user-defined GroupReduce function.
*/
- public GroupReduceOperator(DataSet<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function) {
+ public GroupReduceOperator(DataSet<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function, String defaultName) {
super(input, resultType);
this.function = function;
this.grouper = null;
+ this.defaultName = defaultName;
checkCombinability();
}
@@ -71,11 +74,12 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
* @param input The grouped input to be processed group-wise by the groupReduce function.
* @param function The user-defined GroupReduce function.
*/
- public GroupReduceOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function) {
+ public GroupReduceOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function, String defaultName) {
super(input != null ? input.getDataSet() : null, resultType);
this.function = function;
this.grouper = input;
+ this.defaultName = defaultName;
checkCombinability();
@@ -110,7 +114,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
@Override
protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, OUT, ?> translateToDataFlow(Operator<IN> input) {
- String name = getName() != null ? getName() : function.getClass().getName();
+ String name = getName() != null ? getName() : "GroupReduce at "+defaultName;
// distinguish between grouped reduce and non-grouped reduce
if (grouper == null) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 9be6656..93e0371 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.SemanticPropUtil;
import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
@@ -137,9 +138,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
@SuppressWarnings("unused")
private boolean preserve2;
+ private final String joinLocationName;
+
public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> function,
- TypeInformation<OUT> returnType, JoinHint hint)
+ TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName)
{
super(input1, input2, keys1, keys2, returnType, hint);
@@ -148,6 +151,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
}
this.function = function;
+ this.joinLocationName = joinLocationName;
if (!(function instanceof ProjectFlatJoinFunction)) {
extractSemanticAnnotationsFromUdf(function.getClass());
@@ -158,9 +162,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> generatedFunction, JoinFunction<I1, I2, OUT> function,
- TypeInformation<OUT> returnType, JoinHint hint)
+ TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName)
{
super(input1, input2, keys1, keys2, returnType, hint);
+
+ this.joinLocationName = joinLocationName;
if (function == null) {
throw new NullPointerException();
@@ -204,7 +210,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
Operator<I1> input1,
Operator<I2> input2) {
- String name = getName() != null ? getName() : function.getClass().getName();
+ String name = getName() != null ? getName() : "Join at "+joinLocationName;
try {
keys1.areCompatible(super.keys2);
} catch(IncompatibleKeysException ike) {
@@ -452,11 +458,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
public static final class DefaultJoin<I1, I2> extends EquiJoin<I1, I2, Tuple2<I1, I2>> {
protected DefaultJoin(DataSet<I1> input1, DataSet<I2> input2,
- Keys<I1> keys1, Keys<I2> keys2, JoinHint hint)
+ Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, String joinLocationName)
{
super(input1, input2, keys1, keys2,
(RichFlatJoinFunction<I1, I2, Tuple2<I1, I2>>) new DefaultFlatJoinFunction<I1, I2>(),
- new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint);
+ new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint, joinLocationName);
}
/**
@@ -475,7 +481,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
throw new NullPointerException("Join function must not be null.");
}
TypeInformation<R> returnType = TypeExtractor.getFlatJoinReturnTypes(function, getInput1Type(), getInput2Type());
- return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), function, returnType, getJoinHint());
+ return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), function, returnType, getJoinHint(), Utils.getCallLocationName());
}
public <R> EquiJoin<I1, I2, R> with (JoinFunction<I1, I2, R> function) {
@@ -484,7 +490,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
}
FlatJoinFunction<I1, I2, R> generatedFunction = new WrappingFlatJoinFunction<I1, I2, R>(function);
TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
- return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint());
+ return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint(), Utils.getCallLocationName());
}
public static class WrappingFlatJoinFunction<IN1, IN2, OUT> extends WrappingFunction<JoinFunction<IN1,IN2,OUT>> implements FlatJoinFunction<IN1, IN2, OUT> {
@@ -601,7 +607,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType) {
super(input1, input2, keys1, keys2,
new ProjectFlatJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()),
- returnType, hint);
+ returnType, hint, Utils.getCallLocationName(4)); // We need to use the 4th element in the stack because the call comes through .types().
}
@Override
@@ -850,7 +856,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
throw new InvalidProgramException("The pair of join keys are not compatible with each other.",e);
}
- return new DefaultJoin<I1, I2>(input1, input2, keys1, keys2, joinHint);
+ return new DefaultJoin<I1, I2>(input1, input2, keys1, keys2, joinHint, Utils.getCallLocationName(4));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index f1ece2c..b4433dc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -37,11 +37,13 @@ import org.apache.flink.api.java.DataSet;
public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOperator<IN, OUT>> {
protected final MapFunction<IN, OUT> function;
+
+ protected final String defaultName;
- public MapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapFunction<IN, OUT> function) {
-
+ public MapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapFunction<IN, OUT> function, String defaultName) {
super(input, resultType);
+ this.defaultName = defaultName;
this.function = function;
extractSemanticAnnotationsFromUdf(function.getClass());
}
@@ -49,7 +51,7 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
@Override
protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
- String name = getName() != null ? getName() : function.getClass().getName();
+ String name = getName() != null ? getName() : "Map at "+defaultName;
// create operator
MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
// set input
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
index 839298b..067f7af 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
@@ -38,17 +38,20 @@ public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
protected final MapPartitionFunction<IN, OUT> function;
- public MapPartitionOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapPartitionFunction<IN, OUT> function) {
+ protected final String defaultName;
+
+ public MapPartitionOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapPartitionFunction<IN, OUT> function, String defaultName) {
super(input, resultType);
this.function = function;
+ this.defaultName = defaultName;
extractSemanticAnnotationsFromUdf(function.getClass());
}
@Override
protected MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
- String name = getName() != null ? getName() : function.getClass().getName();
+ String name = getName() != null ? getName() : "MapPartition at "+defaultName;
// create operator
MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> po = new MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
// set input
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index c4548fb..77d5681 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -41,9 +41,11 @@ public class PartitionOperator<T> extends SingleInputUdfOperator<T, T, Partition
private final Keys<T> pKeys;
private final PartitionMethod pMethod;
+ private final String partitionLocationName;
- public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys) {
+ public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, String partitionLocationName) {
super(input, input.getType());
+ this.partitionLocationName = partitionLocationName;
if(pMethod == PartitionMethod.HASH && pKeys == null) {
throw new IllegalArgumentException("Hash Partitioning requires keys");
@@ -59,8 +61,8 @@ public class PartitionOperator<T> extends SingleInputUdfOperator<T, T, Partition
this.pKeys = pKeys;
}
- public PartitionOperator(DataSet<T> input, PartitionMethod pMethod) {
- this(input, pMethod, null);
+ public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, String partitionLocationName) {
+ this(input, pMethod, null, partitionLocationName);
}
/*
@@ -68,7 +70,7 @@ public class PartitionOperator<T> extends SingleInputUdfOperator<T, T, Partition
*/
protected org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> translateToDataFlow(Operator<T> input) {
- String name = "Partition";
+ String name = "Partition at "+partitionLocationName;
// distinguish between partition types
if (pMethod == PartitionMethod.REBALANCE) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 8cb64ba..7089cf6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -46,6 +46,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
private final Grouping<IN> grouper;
+ private final String defaultName;
+
/**
*
* This is the case for a reduce-all case (in contrast to the reduce-per-group case).
@@ -53,21 +55,23 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
* @param input
* @param function
*/
- public ReduceOperator(DataSet<IN> input, ReduceFunction<IN> function) {
+ public ReduceOperator(DataSet<IN> input, ReduceFunction<IN> function, String defaultName) {
super(input, input.getType());
this.function = function;
this.grouper = null;
+ this.defaultName = defaultName;
extractSemanticAnnotationsFromUdf(function.getClass());
}
- public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function) {
+ public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function, String defaultName) {
super(input.getDataSet(), input.getDataSet().getType());
this.function = function;
this.grouper = input;
+ this.defaultName = defaultName;
extractSemanticAnnotationsFromUdf(function.getClass());
}
@@ -75,7 +79,7 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
@Override
protected org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input) {
- String name = getName() != null ? getName() : function.getClass().getName();
+ String name = getName() != null ? getName() : "Reduce at "+defaultName;
// distinguish between grouped reduce and non-grouped reduce
if (grouper == null) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index c9700ce..36d14ee 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.operators;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.FirstReducer;
import java.util.Arrays;
@@ -110,7 +111,7 @@ public class SortedGrouping<T> extends Grouping<T> {
}
TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer,
this.getDataSet().getType());
- return new GroupReduceOperator<T, R>(this, resultType, reducer);
+ return new GroupReduceOperator<T, R>(this, resultType, reducer, Utils.getCallLocationName() );
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
index efc1ebc..c6f72f2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.java.operators;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Union;
-
import org.apache.flink.api.java.DataSet;
/**
@@ -31,14 +30,17 @@ import org.apache.flink.api.java.DataSet;
*/
public class UnionOperator<T> extends TwoInputOperator<T, T, T, UnionOperator<T>> {
+ private final String unionLocationName;
/**
* Create an operator that produces the union of the two given data sets.
*
* @param input1 The first data set to be unioned.
* @param input2 The second data set to be unioned.
*/
- public UnionOperator(DataSet<T> input1, DataSet<T> input2) {
+ public UnionOperator(DataSet<T> input1, DataSet<T> input2, String unionLocationName) {
super(input1, input2, input1.getType());
+
+ this.unionLocationName = unionLocationName;
}
/**
@@ -50,6 +52,6 @@ public class UnionOperator<T> extends TwoInputOperator<T, T, T, UnionOperator<T>
*/
@Override
protected Union<T> translateToDataFlow(Operator<T> input1, Operator<T> input2) {
- return new Union<T>(input1, input2);
+ return new Union<T>(input1, input2, unionLocationName);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 910846d..b504e37 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.functions.FirstReducer;
import org.apache.flink.api.java.functions.SelectByMaxFunction;
@@ -58,7 +59,12 @@ public class UnsortedGrouping<T> extends Grouping<T> {
* @see DataSet
*/
public AggregateOperator<T> aggregate(Aggregations agg, int field) {
- return new AggregateOperator<T>(this, agg, field);
+ return aggregate(agg, field, Utils.getCallLocationName());
+ }
+
+ // private helper that allows to set a different call location name
+ private AggregateOperator<T> aggregate(Aggregations agg, int field, String callLocationName) {
+ return new AggregateOperator<T>(this, agg, field, callLocationName);
}
/**
@@ -69,7 +75,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
* @see org.apache.flink.api.java.operators.AggregateOperator
*/
public AggregateOperator<T> sum (int field) {
- return this.aggregate (Aggregations.SUM, field);
+ return this.aggregate (Aggregations.SUM, field, Utils.getCallLocationName());
}
/**
@@ -80,7 +86,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
* @see org.apache.flink.api.java.operators.AggregateOperator
*/
public AggregateOperator<T> max (int field) {
- return this.aggregate (Aggregations.MAX, field);
+ return this.aggregate (Aggregations.MAX, field, Utils.getCallLocationName());
}
/**
@@ -91,7 +97,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
* @see org.apache.flink.api.java.operators.AggregateOperator
*/
public AggregateOperator<T> min (int field) {
- return this.aggregate (Aggregations.MIN, field);
+ return this.aggregate (Aggregations.MIN, field, Utils.getCallLocationName());
}
/**
@@ -111,7 +117,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
if (reducer == null) {
throw new NullPointerException("Reduce function must not be null.");
}
- return new ReduceOperator<T>(this, reducer);
+ return new ReduceOperator<T>(this, reducer, Utils.getCallLocationName());
}
/**
@@ -133,7 +139,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
}
TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType());
- return new GroupReduceOperator<T, R>(this, resultType, reducer);
+ return new GroupReduceOperator<T, R>(this, resultType, reducer, Utils.getCallLocationName());
}
/**
@@ -167,7 +173,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
}
return new ReduceOperator<T>(this, new SelectByMinFunction(
- (TupleTypeInfo) this.dataSet.getType(), fields));
+ (TupleTypeInfo) this.dataSet.getType(), fields), Utils.getCallLocationName());
}
/**
@@ -188,7 +194,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
}
return new ReduceOperator<T>(this, new SelectByMaxFunction(
- (TupleTypeInfo) this.dataSet.getType(), fields));
+ (TupleTypeInfo) this.dataSet.getType(), fields), Utils.getCallLocationName());
}
// --------------------------------------------------------------------------------------------
// Group Operations
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index d439e79..de620f9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -78,7 +78,13 @@ class TupleGenerator {
private static final int LAST = 25;
public static void main(String[] args) throws Exception {
- File root = new File(ROOT_DIRECTORY);
+ System.err.println("Current directory "+System.getProperty("user.dir"));
+ String rootDir = ROOT_DIRECTORY;
+ if(args.length > 0) {
+ rootDir = args[0] + "/" + ROOT_DIRECTORY;
+ }
+ System.err.println("Using root directory: "+rootDir);
+ File root = new File(rootDir);
createTupleClasses(root);
@@ -478,7 +484,7 @@ class TupleGenerator {
// return
sb.append("\t\treturn new DataSource<Tuple" + numFields + "<");
appendTupleTypeGenerics(sb, numFields);
- sb.append(">>(executionContext, inputFormat, types);\n");
+ sb.append(">>(executionContext, inputFormat, types, DataSet.getCallLocationName());\n");
// end of method
sb.append("\t}\n");
@@ -834,7 +840,7 @@ class TupleGenerator {
}
private static String HEADER =
- "/**\n"
+ "/*\n"
+ " * Licensed to the Apache Software Foundation (ASF) under one\n"
+ " * or more contributor license agreements. See the NOTICE file\n"
+ " * distributed with this work for additional information\n"