You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/23 10:07:53 UTC
[1/4] flink git commit: [FLINK-2206] [webui] Fix incorrect counts of
finished, canceled, and failed jobs in new web dashboard
Repository: flink
Updated Branches:
refs/heads/master 201f55b85 -> cd7ed8e38
[FLINK-2206] [webui] Fix incorrect counts of finished, canceled, and failed jobs in new web dashboard
This closes #1287
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4db27060
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4db27060
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4db27060
Branch: refs/heads/master
Commit: 4db27060b3ebaf13e3e03ac4864c3b43ac776bb6
Parents: 201f55b
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Oct 22 14:41:11 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Oct 22 20:51:14 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/jobmanager/MemoryArchivist.scala | 16 +---------------
1 file changed, 1 insertion(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4db27060/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index bef52e0..2d55b26 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -188,21 +188,7 @@ class MemoryArchivist(private val max_entries: Int)
// --------------------------------------------------------------------------
private def createJobsOverview() : JobsOverview = {
- var runningOrPending = 0
- var finished = 0
- var canceled = 0
- var failed = 0
-
- graphs.values.foreach {
- _.getState() match {
- case JobStatus.FINISHED => finished += 1
- case JobStatus.CANCELED => canceled += 1
- case JobStatus.FAILED => failed += 1
- case _ => runningOrPending += 1
- }
- }
-
- new JobsOverview(runningOrPending, finished, canceled, failed)
+ new JobsOverview(0, finishedCnt, canceledCnt, failedCnt)
}
private def createJobsWithIDsOverview() : JobsWithIDsOverview = {
[2/4] flink git commit: [FLINK-2668] [DataSet] [api-breaking] Chained
Projections are no longer appended
Posted by fh...@apache.org.
[FLINK-2668] [DataSet] [api-breaking] Chained Projections are no longer appended
This closes #1279
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/32b0dfd1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/32b0dfd1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/32b0dfd1
Branch: refs/heads/master
Commit: 32b0dfd102c4264a26ad9b5f6bb5779f8a8d33da
Parents: 4db2706
Author: zentol <s....@web.de>
Authored: Wed Oct 21 15:27:05 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Oct 22 20:51:25 2015 +0200
----------------------------------------------------------------------
.../api/java/operators/ProjectOperator.java | 104 +++++--------------
.../flink/api/java/tuple/TupleGenerator.java | 2 +-
.../SemanticPropertiesProjectionTest.java | 4 +-
.../api/python/flink/test/test_main.py | 2 +-
4 files changed, 29 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/32b0dfd1/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index 55e182f..de79383 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -47,21 +47,11 @@ public class ProjectOperator<IN, OUT extends Tuple>
extends SingleInputOperator<IN, OUT, ProjectOperator<IN, OUT>> {
protected final int[] fields;
-
- private Projection<IN> proj;
public ProjectOperator(DataSet<IN> input, int[] fields, TupleTypeInfo<OUT> returnType) {
super(input, returnType);
this.fields = fields;
- proj = null;
- }
-
- public ProjectOperator(DataSet<IN> input, int[] fields, TupleTypeInfo<OUT> returnType, Projection<IN> proj) {
- super(input, returnType);
-
- this.fields = fields;
- this.proj = proj;
}
@Override
@@ -77,28 +67,6 @@ public class ProjectOperator<IN, OUT extends Tuple>
return ppo;
}
-
- /**
- * Continues a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
- * <b>Note: Only Tuple DataSets can be projected using field indexes.</b></br>
- * The transformation projects each Tuple of the DataSet onto a (sub)set of fields.</br>
- * Additional fields can be added to the projection by calling this method repeatedly.
- *
- * <b>Note: With the current implementation, the Project transformation looses type information.</b>
- *
- * @param fieldIndexes The field indexes which are added to the Project transformation.
- * The order of fields in the output tuple corresponds to the order of field indexes.
- * @return A ProjectOperator that represents the projected DataSet.
- *
- * @see Tuple
- * @see DataSet
- * @see ProjectOperator
- */
- public <R extends Tuple> ProjectOperator<?, R> project(int... fieldIndexes) {
- proj.acceptAdditionalIndexes(fieldIndexes);
-
- return proj.projectTupleX();
- }
/**
* Deprecated method only kept for compatibility.
*/
@@ -146,28 +114,6 @@ public class ProjectOperator<IN, OUT extends Tuple>
this.fieldIndexes = fieldIndexes;
}
- private void acceptAdditionalIndexes(int... additionalIndexes) {
-
- if(additionalIndexes.length == 0) {
- throw new IllegalArgumentException("project() needs to select at least one (1) field.");
- } else if(additionalIndexes.length > Tuple.MAX_ARITY - 1) {
- throw new IllegalArgumentException(
- "project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
- }
-
- int offset = this.fieldIndexes.length;
-
- this.fieldIndexes = Arrays.copyOf(this.fieldIndexes, this.fieldIndexes.length + additionalIndexes.length);
-
- int maxFieldIndex = ds.getType().getArity();
- for (int i = 0; i < additionalIndexes.length; i++) {
- Preconditions.checkElementIndex(additionalIndexes[i], maxFieldIndex);
-
- this.fieldIndexes[offset + i] = additionalIndexes[i];
- }
- }
-
-
// --------------------------------------------------------------------------------------------
// The following lines are generated.
@@ -231,7 +177,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple1<T0>> tType = new TupleTypeInfo<Tuple1<T0>>(fTypes);
- return new ProjectOperator<T, Tuple1<T0>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple1<T0>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -246,7 +192,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple2<T0, T1>> tType = new TupleTypeInfo<Tuple2<T0, T1>>(fTypes);
- return new ProjectOperator<T, Tuple2<T0, T1>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple2<T0, T1>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -261,7 +207,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple3<T0, T1, T2>> tType = new TupleTypeInfo<Tuple3<T0, T1, T2>>(fTypes);
- return new ProjectOperator<T, Tuple3<T0, T1, T2>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple3<T0, T1, T2>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -276,7 +222,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple4<T0, T1, T2, T3>> tType = new TupleTypeInfo<Tuple4<T0, T1, T2, T3>>(fTypes);
- return new ProjectOperator<T, Tuple4<T0, T1, T2, T3>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple4<T0, T1, T2, T3>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -291,7 +237,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> tType = new TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>>(fTypes);
- return new ProjectOperator<T, Tuple5<T0, T1, T2, T3, T4>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple5<T0, T1, T2, T3, T4>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -306,7 +252,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> tType = new TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>>(fTypes);
- return new ProjectOperator<T, Tuple6<T0, T1, T2, T3, T4, T5>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple6<T0, T1, T2, T3, T4, T5>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -321,7 +267,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> tType = new TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fTypes);
- return new ProjectOperator<T, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -336,7 +282,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> tType = new TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fTypes);
- return new ProjectOperator<T, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -351,7 +297,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> tType = new TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fTypes);
- return new ProjectOperator<T, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -366,7 +312,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> tType = new TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fTypes);
- return new ProjectOperator<T, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -381,7 +327,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> tType = new TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fTypes);
- return new ProjectOperator<T, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -396,7 +342,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> tType = new TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fTypes);
- return new ProjectOperator<T, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -411,7 +357,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> tType = new TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fTypes);
- return new ProjectOperator<T, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -426,7 +372,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> tType = new TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fTypes);
- return new ProjectOperator<T, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -441,7 +387,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> tType = new TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fTypes);
- return new ProjectOperator<T, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -456,7 +402,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> tType = new TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fTypes);
- return new ProjectOperator<T, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -471,7 +417,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> tType = new TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fTypes);
- return new ProjectOperator<T, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -486,7 +432,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> tType = new TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fTypes);
- return new ProjectOperator<T, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -501,7 +447,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> tType = new TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fTypes);
- return new ProjectOperator<T, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -516,7 +462,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> tType = new TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fTypes);
- return new ProjectOperator<T, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -531,7 +477,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> tType = new TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fTypes);
- return new ProjectOperator<T, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -546,7 +492,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> tType = new TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fTypes);
- return new ProjectOperator<T, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -561,7 +507,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> tType = new TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fTypes);
- return new ProjectOperator<T, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -576,7 +522,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> tType = new TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fTypes);
- return new ProjectOperator<T, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(this.ds, this.fieldIndexes, tType);
}
/**
@@ -591,7 +537,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> tType = new TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fTypes);
- return new ProjectOperator<T, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(this.ds, this.fieldIndexes, tType, this);
+ return new ProjectOperator<T, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(this.ds, this.fieldIndexes, tType);
}
// END_OF_TUPLE_DEPENDENT_CODE
http://git-wip-us.apache.org/repos/asf/flink/blob/32b0dfd1/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index f306fe0..66ee25f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -304,7 +304,7 @@ class TupleGenerator {
// create and return new project operator
sb.append("\t\t\treturn new ProjectOperator<T, Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
- sb.append(">>(this.ds, this.fieldIndexes, tType, this);\n");
+ sb.append(">>(this.ds, this.fieldIndexes, tType);\n");
// method end
sb.append("\t\t}\n");
http://git-wip-us.apache.org/repos/asf/flink/blob/32b0dfd1/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
index 916086b..0368ca3 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
@@ -71,7 +71,7 @@ public class SemanticPropertiesProjectionTest {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
- tupleDs.project(1, 3, 2).project(0, 3).output(new DiscardingOutputFormat<Tuple>());
+ tupleDs.project(1, 3, 2, 0, 3).output(new DiscardingOutputFormat<Tuple>());
Plan plan = env.createProgramPlan();
@@ -97,7 +97,7 @@ public class SemanticPropertiesProjectionTest {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple4<Integer, Tuple3<String, Integer, Long>, Tuple2<Long, Long>, String>> tupleDs = env.fromCollection(emptyNestedTupleData, nestedTupleTypeInfo);
- tupleDs.project(2, 3, 1).project(2).output(new DiscardingOutputFormat<Tuple>());
+ tupleDs.project(2, 3, 1, 2).output(new DiscardingOutputFormat<Tuple>());
Plan plan = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/32b0dfd1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
index 29113f3..2116d1f 100644
--- a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
+++ b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
@@ -232,7 +232,7 @@ if __name__ == "__main__":
.map_partition(Verify([((1, 0.5, "hello", True), ("hello",)), ((2, 0.4, "world", False), ("world",))], "Default Join"), STRING).output()
d2 \
- .project(0, 1).project(2) \
+ .project(0, 1, 2) \
.map_partition(Verify([(1, 0.5, "hello"), (2, 0.4, "world")], "Project"), STRING).output()
d2 \
[4/4] flink git commit: [FLINK-2874] Fix recognition of Scala default
setters
Posted by fh...@apache.org.
[FLINK-2874] Fix recognition of Scala default setters
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd7ed8e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd7ed8e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd7ed8e3
Branch: refs/heads/master
Commit: cd7ed8e38c0d0e1313f5759e7bf34e0904972c6e
Parents: 17e7b42
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Oct 23 00:06:55 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 23 00:06:55 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/java/typeutils/TypeExtractor.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cd7ed8e3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 0281da6..782d58d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -1311,7 +1311,9 @@ public class TypeExtractor {
fieldType = materializeTypeVariable(typeHierarchy, (TypeVariable<?>)fieldType);
}
for(Method m : clazz.getMethods()) {
- final String methodNameLow = m.getName().toLowerCase().replaceAll("_", "");
+ final String methodNameLow = m.getName().endsWith("_$eq") ?
+ m.getName().toLowerCase().replaceAll("_", "").replaceFirst("\\$eq$", "_\\$eq") :
+ m.getName().toLowerCase().replaceAll("_", "");
// check for getter
if( // The name should be "get<FieldName>" or "<fieldName>" (for scala) or "is<fieldName>" for boolean fields.
[3/4] flink git commit: [FLINK-2874] Fix Avro getter/setter
recognition
Posted by fh...@apache.org.
[FLINK-2874] Fix Avro getter/setter recognition
This closes #1252
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17e7b423
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17e7b423
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17e7b423
Branch: refs/heads/master
Commit: 17e7b4238a7e0bd8ecca8b9c210a2ff2f5a9513c
Parents: 32b0dfd
Author: Ulf Karlsson <uk...@spotify.com>
Authored: Sun Oct 11 00:04:57 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 23 00:03:32 2015 +0200
----------------------------------------------------------------------
.../flink/api/java/typeutils/TypeExtractor.java | 17 +++++---
.../api/java/typeutils/PojoTypeInfoTest.java | 44 +++++++++++++++++++-
2 files changed, 53 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/17e7b423/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 1dec90b..0281da6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.commons.lang3.ClassUtils;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -1299,22 +1300,26 @@ public class TypeExtractor {
return true;
} else {
boolean hasGetter = false, hasSetter = false;
- final String fieldNameLow = f.getName().toLowerCase();
-
+ final String fieldNameLow = f.getName().toLowerCase().replaceAll("_", "");
+
Type fieldType = f.getGenericType();
+ Class<?> fieldTypeWrapper = ClassUtils.primitiveToWrapper(f.getType());
+
TypeVariable<?> fieldTypeGeneric = null;
if(fieldType instanceof TypeVariable) {
fieldTypeGeneric = (TypeVariable<?>) fieldType;
fieldType = materializeTypeVariable(typeHierarchy, (TypeVariable<?>)fieldType);
}
for(Method m : clazz.getMethods()) {
+ final String methodNameLow = m.getName().toLowerCase().replaceAll("_", "");
+
// check for getter
if( // The name should be "get<FieldName>" or "<fieldName>" (for scala) or "is<fieldName>" for boolean fields.
- (m.getName().toLowerCase().equals("get"+fieldNameLow) || m.getName().toLowerCase().equals("is"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow)) &&
+ (methodNameLow.equals("get"+fieldNameLow) || methodNameLow.equals("is"+fieldNameLow) || methodNameLow.equals(fieldNameLow)) &&
// no arguments for the getter
m.getParameterTypes().length == 0 &&
// return type is same as field type (or the generic variant of it)
- (m.getGenericReturnType().equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericReturnType().equals(fieldTypeGeneric)) )
+ (m.getGenericReturnType().equals( fieldType ) || (fieldTypeWrapper != null && m.getReturnType().equals( fieldTypeWrapper )) || (fieldTypeGeneric != null && m.getGenericReturnType().equals(fieldTypeGeneric)) )
) {
if(hasGetter) {
throw new IllegalStateException("Detected more than one getter");
@@ -1322,9 +1327,9 @@ public class TypeExtractor {
hasGetter = true;
}
// check for setters (<FieldName>_$eq for scala)
- if((m.getName().toLowerCase().equals("set"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow+"_$eq")) &&
+ if((methodNameLow.equals("set"+fieldNameLow) || methodNameLow.equals(fieldNameLow+"_$eq")) &&
m.getParameterTypes().length == 1 && // one parameter of the field's type
- ( m.getGenericParameterTypes()[0].equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
+ (m.getGenericParameterTypes()[0].equals( fieldType ) || (fieldTypeWrapper != null && m.getParameterTypes()[0].equals( fieldTypeWrapper )) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
// return type is void.
m.getReturnType().equals(Void.TYPE)
) {
http://git-wip-us.apache.org/repos/asf/flink/blob/17e7b423/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
index 2fe1357..dbe5115 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
@@ -74,11 +74,25 @@ public class PojoTypeInfoTest {
assertEquals(pojoTypeInfo, deserializedPojoTypeInfo);
}
-
+
+ @Test
+ public void testPrimitivePojo() {
+ TypeInformation<PrimitivePojo> info1 = TypeExtractor.getForClass(PrimitivePojo.class);
+
+ assertTrue(info1 instanceof PojoTypeInfo);
+ }
+
+ @Test
+ public void testUnderscorePojo() {
+ TypeInformation<UnderscorePojo> info1 = TypeExtractor.getForClass(UnderscorePojo.class);
+
+ assertTrue(info1 instanceof PojoTypeInfo);
+ }
+
public static final class TestPojo {
public int someInt;
-
+
private String aString;
public Double[] doubleArray;
@@ -110,4 +124,30 @@ public class PojoTypeInfoTest {
return aString;
}
}
+
+ public static final class PrimitivePojo {
+
+ private int someInt;
+
+ public void setSomeInt(Integer someInt) {
+ this.someInt = someInt;
+ }
+
+ public Integer getSomeInt() {
+ return this.someInt;
+ }
+ }
+
+ public static final class UnderscorePojo {
+
+ private int some_int;
+
+ public void setSomeInt(int some_int) {
+ this.some_int = some_int;
+ }
+
+ public Integer getSomeInt() {
+ return this.some_int;
+ }
+ }
}