You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/23 10:07:53 UTC

[1/4] flink git commit: [FLINK-2206] [webui] Fix incorrect counts of finished, canceled, and failed jobs in new web dashboard

Repository: flink
Updated Branches:
  refs/heads/master 201f55b85 -> cd7ed8e38


[FLINK-2206] [webui] Fix incorrect counts of finished, canceled, and failed jobs in new web dashboard

This closes #1287


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4db27060
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4db27060
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4db27060

Branch: refs/heads/master
Commit: 4db27060b3ebaf13e3e03ac4864c3b43ac776bb6
Parents: 201f55b
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Oct 22 14:41:11 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Oct 22 20:51:14 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/MemoryArchivist.scala  | 16 +---------------
 1 file changed, 1 insertion(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4db27060/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index bef52e0..2d55b26 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -188,21 +188,7 @@ class MemoryArchivist(private val max_entries: Int)
   // --------------------------------------------------------------------------
   
   private def createJobsOverview() : JobsOverview = {
-    var runningOrPending = 0
-    var finished = 0
-    var canceled = 0
-    var failed = 0
-    
-    graphs.values.foreach {
-      _.getState() match {
-        case JobStatus.FINISHED => finished += 1
-        case JobStatus.CANCELED => canceled += 1
-        case JobStatus.FAILED => failed += 1
-        case _ => runningOrPending += 1
-      }
-    }
-    
-    new JobsOverview(runningOrPending, finished, canceled, failed)
+    new JobsOverview(0, finishedCnt, canceledCnt, failedCnt)
   }
 
   private def createJobsWithIDsOverview() : JobsWithIDsOverview = {


[2/4] flink git commit: [FLINK-2668] [DataSet] [api-breaking] Chained Projections are no longer appended

Posted by fh...@apache.org.
[FLINK-2668] [DataSet] [api-breaking] Chained Projections are no longer appended

This closes #1279


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/32b0dfd1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/32b0dfd1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/32b0dfd1

Branch: refs/heads/master
Commit: 32b0dfd102c4264a26ad9b5f6bb5779f8a8d33da
Parents: 4db2706
Author: zentol <s....@web.de>
Authored: Wed Oct 21 15:27:05 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Oct 22 20:51:25 2015 +0200

----------------------------------------------------------------------
 .../api/java/operators/ProjectOperator.java     | 104 +++++--------------
 .../flink/api/java/tuple/TupleGenerator.java    |   2 +-
 .../SemanticPropertiesProjectionTest.java       |   4 +-
 .../api/python/flink/test/test_main.py          |   2 +-
 4 files changed, 29 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/32b0dfd1/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index 55e182f..de79383 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -47,21 +47,11 @@ public class ProjectOperator<IN, OUT extends Tuple>
 	extends SingleInputOperator<IN, OUT, ProjectOperator<IN, OUT>> {
 	
 	protected final int[] fields;
-	
-	private Projection<IN> proj;
 
 	public ProjectOperator(DataSet<IN> input, int[] fields, TupleTypeInfo<OUT> returnType) {
 		super(input, returnType);
 	
 		this.fields = fields;
-		proj = null;
-	}
-	
-	public ProjectOperator(DataSet<IN> input, int[] fields, TupleTypeInfo<OUT> returnType, Projection<IN> proj) {
-		super(input, returnType);
-	
-		this.fields = fields;
-		this.proj = proj;
 	}
 
 	@Override
@@ -77,28 +67,6 @@ public class ProjectOperator<IN, OUT extends Tuple>
 
 		return ppo;
 	}
-
-	/**
-	 * Continues a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
-	 * <b>Note: Only Tuple DataSets can be projected using field indexes.</b></br>
-	 * The transformation projects each Tuple of the DataSet onto a (sub)set of fields.</br>
-	 * Additional fields can be added to the projection by calling this method repeatedly.
-	 *
-	 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
-	 *
-	 * @param fieldIndexes The field indexes which are added to the Project transformation.
-	 * 					   The order of fields in the output tuple corresponds to the order of field indexes.
-	 * @return A ProjectOperator that represents the projected DataSet.
-	 *
-	 * @see Tuple
-	 * @see DataSet
-	 * @see ProjectOperator
-	 */
-	public <R extends Tuple> ProjectOperator<?, R> project(int... fieldIndexes) {
-		proj.acceptAdditionalIndexes(fieldIndexes);
-		
-		return proj.projectTupleX();
-	}
 	/**
 	 * Deprecated method only kept for compatibility.
 	 */
@@ -146,28 +114,6 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			this.fieldIndexes = fieldIndexes;
 		}
 		
-		private void acceptAdditionalIndexes(int... additionalIndexes) {
-			
-			if(additionalIndexes.length == 0) {
-				throw new IllegalArgumentException("project() needs to select at least one (1) field.");
-			} else if(additionalIndexes.length > Tuple.MAX_ARITY - 1) {
-				throw new IllegalArgumentException(
-						"project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
-			}
-			
-			int offset = this.fieldIndexes.length;
-			
-			this.fieldIndexes = Arrays.copyOf(this.fieldIndexes, this.fieldIndexes.length + additionalIndexes.length);
-			
-			int maxFieldIndex = ds.getType().getArity();
-			for (int i = 0; i < additionalIndexes.length; i++) {
-				Preconditions.checkElementIndex(additionalIndexes[i], maxFieldIndex);
-
-				this.fieldIndexes[offset + i] = additionalIndexes[i];
-			}
-		}
-		
-		
 		
 		// --------------------------------------------------------------------------------------------	
 		// The following lines are generated.
@@ -231,7 +177,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple1<T0>> tType = new TupleTypeInfo<Tuple1<T0>>(fTypes);
 
-			return new ProjectOperator<T, Tuple1<T0>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple1<T0>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -246,7 +192,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple2<T0, T1>> tType = new TupleTypeInfo<Tuple2<T0, T1>>(fTypes);
 
-			return new ProjectOperator<T, Tuple2<T0, T1>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple2<T0, T1>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -261,7 +207,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple3<T0, T1, T2>> tType = new TupleTypeInfo<Tuple3<T0, T1, T2>>(fTypes);
 
-			return new ProjectOperator<T, Tuple3<T0, T1, T2>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple3<T0, T1, T2>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -276,7 +222,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple4<T0, T1, T2, T3>> tType = new TupleTypeInfo<Tuple4<T0, T1, T2, T3>>(fTypes);
 
-			return new ProjectOperator<T, Tuple4<T0, T1, T2, T3>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple4<T0, T1, T2, T3>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -291,7 +237,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> tType = new TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>>(fTypes);
 
-			return new ProjectOperator<T, Tuple5<T0, T1, T2, T3, T4>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple5<T0, T1, T2, T3, T4>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -306,7 +252,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> tType = new TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>>(fTypes);
 
-			return new ProjectOperator<T, Tuple6<T0, T1, T2, T3, T4, T5>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple6<T0, T1, T2, T3, T4, T5>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -321,7 +267,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> tType = new TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fTypes);
 
-			return new ProjectOperator<T, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -336,7 +282,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> tType = new TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fTypes);
 
-			return new ProjectOperator<T, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -351,7 +297,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> tType = new TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fTypes);
 
-			return new ProjectOperator<T, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -366,7 +312,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> tType = new TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fTypes);
 
-			return new ProjectOperator<T, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -381,7 +327,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> tType = new TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fTypes);
 
-			return new ProjectOperator<T, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -396,7 +342,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> tType = new TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fTypes);
 
-			return new ProjectOperator<T, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -411,7 +357,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> tType = new TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fTypes);
 
-			return new ProjectOperator<T, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -426,7 +372,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> tType = new TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fTypes);
 
-			return new ProjectOperator<T, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -441,7 +387,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> tType = new TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fTypes);
 
-			return new ProjectOperator<T, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -456,7 +402,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> tType = new TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fTypes);
 
-			return new ProjectOperator<T, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -471,7 +417,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> tType = new TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fTypes);
 
-			return new ProjectOperator<T, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -486,7 +432,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> tType = new TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fTypes);
 
-			return new ProjectOperator<T, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -501,7 +447,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> tType = new TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fTypes);
 
-			return new ProjectOperator<T, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -516,7 +462,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> tType = new TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fTypes);
 
-			return new ProjectOperator<T, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -531,7 +477,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> tType = new TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fTypes);
 
-			return new ProjectOperator<T, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -546,7 +492,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> tType = new TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fTypes);
 
-			return new ProjectOperator<T, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -561,7 +507,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> tType = new TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fTypes);
 
-			return new ProjectOperator<T, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -576,7 +522,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> tType = new TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fTypes);
 
-			return new ProjectOperator<T, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		/**
@@ -591,7 +537,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());
 			TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> tType = new TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fTypes);
 
-			return new ProjectOperator<T, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(this.ds, this.fieldIndexes, tType, this);
+			return new ProjectOperator<T, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(this.ds, this.fieldIndexes, tType);
 		}
 
 		// END_OF_TUPLE_DEPENDENT_CODE

http://git-wip-us.apache.org/repos/asf/flink/blob/32b0dfd1/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index f306fe0..66ee25f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -304,7 +304,7 @@ class TupleGenerator {
 			// create and return new project operator
 			sb.append("\t\t\treturn new ProjectOperator<T, Tuple"+numFields+"<");
 			appendTupleTypeGenerics(sb, numFields);
-			sb.append(">>(this.ds, this.fieldIndexes, tType, this);\n");
+			sb.append(">>(this.ds, this.fieldIndexes, tType);\n");
 
 			// method end
 			sb.append("\t\t}\n");

http://git-wip-us.apache.org/repos/asf/flink/blob/32b0dfd1/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
index 916086b..0368ca3 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
@@ -71,7 +71,7 @@ public class SemanticPropertiesProjectionTest {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		tupleDs.project(1, 3, 2).project(0, 3).output(new DiscardingOutputFormat<Tuple>());
+		tupleDs.project(1, 3, 2, 0, 3).output(new DiscardingOutputFormat<Tuple>());
 
 		Plan plan = env.createProgramPlan();
 
@@ -97,7 +97,7 @@ public class SemanticPropertiesProjectionTest {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple4<Integer, Tuple3<String, Integer, Long>, Tuple2<Long, Long>, String>> tupleDs = env.fromCollection(emptyNestedTupleData, nestedTupleTypeInfo);
 
-		tupleDs.project(2, 3, 1).project(2).output(new DiscardingOutputFormat<Tuple>());
+		tupleDs.project(2, 3, 1, 2).output(new DiscardingOutputFormat<Tuple>());
 
 		Plan plan = env.createProgramPlan();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/32b0dfd1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
index 29113f3..2116d1f 100644
--- a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
+++ b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
@@ -232,7 +232,7 @@ if __name__ == "__main__":
         .map_partition(Verify([((1, 0.5, "hello", True), ("hello",)), ((2, 0.4, "world", False), ("world",))], "Default Join"), STRING).output()
 
     d2 \
-        .project(0, 1).project(2) \
+        .project(0, 1, 2) \
         .map_partition(Verify([(1, 0.5, "hello"), (2, 0.4, "world")], "Project"), STRING).output()
 
     d2 \


[4/4] flink git commit: [FLINK-2874] Fix recognition of Scala default setters

Posted by fh...@apache.org.
[FLINK-2874] Fix recognition of Scala default setters


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd7ed8e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd7ed8e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd7ed8e3

Branch: refs/heads/master
Commit: cd7ed8e38c0d0e1313f5759e7bf34e0904972c6e
Parents: 17e7b42
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Oct 23 00:06:55 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 23 00:06:55 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/typeutils/TypeExtractor.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cd7ed8e3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 0281da6..782d58d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -1311,7 +1311,9 @@ public class TypeExtractor {
 				fieldType = materializeTypeVariable(typeHierarchy, (TypeVariable<?>)fieldType);
 			}
 			for(Method m : clazz.getMethods()) {
-				final String methodNameLow = m.getName().toLowerCase().replaceAll("_", "");
+				final String methodNameLow = m.getName().endsWith("_$eq") ?
+						m.getName().toLowerCase().replaceAll("_", "").replaceFirst("\\$eq$", "_\\$eq") :
+						m.getName().toLowerCase().replaceAll("_", "");
 
 				// check for getter
 				if(	// The name should be "get<FieldName>" or "<fieldName>" (for scala) or "is<fieldName>" for boolean fields.


[3/4] flink git commit: [FLINK-2874] Fix Avro getter/setter recognition

Posted by fh...@apache.org.
[FLINK-2874] Fix Avro getter/setter recognition

This closes #1252


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17e7b423
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17e7b423
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17e7b423

Branch: refs/heads/master
Commit: 17e7b4238a7e0bd8ecca8b9c210a2ff2f5a9513c
Parents: 32b0dfd
Author: Ulf Karlsson <uk...@spotify.com>
Authored: Sun Oct 11 00:04:57 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 23 00:03:32 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java | 17 +++++---
 .../api/java/typeutils/PojoTypeInfoTest.java    | 44 +++++++++++++++++++-
 2 files changed, 53 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/17e7b423/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 1dec90b..0281da6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.commons.lang3.ClassUtils;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -1299,22 +1300,26 @@ public class TypeExtractor {
 			return true;
 		} else {
 			boolean hasGetter = false, hasSetter = false;
-			final String fieldNameLow = f.getName().toLowerCase();
-			
+			final String fieldNameLow = f.getName().toLowerCase().replaceAll("_", "");
+
 			Type fieldType = f.getGenericType();
+			Class<?> fieldTypeWrapper = ClassUtils.primitiveToWrapper(f.getType());
+
 			TypeVariable<?> fieldTypeGeneric = null;
 			if(fieldType instanceof TypeVariable) {
 				fieldTypeGeneric = (TypeVariable<?>) fieldType;
 				fieldType = materializeTypeVariable(typeHierarchy, (TypeVariable<?>)fieldType);
 			}
 			for(Method m : clazz.getMethods()) {
+				final String methodNameLow = m.getName().toLowerCase().replaceAll("_", "");
+
 				// check for getter
 				if(	// The name should be "get<FieldName>" or "<fieldName>" (for scala) or "is<fieldName>" for boolean fields.
-					(m.getName().toLowerCase().equals("get"+fieldNameLow) || m.getName().toLowerCase().equals("is"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow)) &&
+					(methodNameLow.equals("get"+fieldNameLow) || methodNameLow.equals("is"+fieldNameLow) || methodNameLow.equals(fieldNameLow)) &&
 					// no arguments for the getter
 					m.getParameterTypes().length == 0 &&
 					// return type is same as field type (or the generic variant of it)
-					(m.getGenericReturnType().equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericReturnType().equals(fieldTypeGeneric)) )
+					(m.getGenericReturnType().equals( fieldType ) || (fieldTypeWrapper != null && m.getReturnType().equals( fieldTypeWrapper )) || (fieldTypeGeneric != null && m.getGenericReturnType().equals(fieldTypeGeneric)) )
 				) {
 					if(hasGetter) {
 						throw new IllegalStateException("Detected more than one getter");
@@ -1322,9 +1327,9 @@ public class TypeExtractor {
 					hasGetter = true;
 				}
 				// check for setters (<FieldName>_$eq for scala)
-				if((m.getName().toLowerCase().equals("set"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow+"_$eq")) &&
+				if((methodNameLow.equals("set"+fieldNameLow) || methodNameLow.equals(fieldNameLow+"_$eq")) &&
 					m.getParameterTypes().length == 1 && // one parameter of the field's type
-					( m.getGenericParameterTypes()[0].equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
+					(m.getGenericParameterTypes()[0].equals( fieldType ) || (fieldTypeWrapper != null && m.getParameterTypes()[0].equals( fieldTypeWrapper )) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
 					// return type is void.
 					m.getReturnType().equals(Void.TYPE)
 				) {

http://git-wip-us.apache.org/repos/asf/flink/blob/17e7b423/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
index 2fe1357..dbe5115 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
@@ -74,11 +74,25 @@ public class PojoTypeInfoTest {
 
 		assertEquals(pojoTypeInfo, deserializedPojoTypeInfo);
 	}
-	
+
+	@Test
+	public void testPrimitivePojo() {
+		TypeInformation<PrimitivePojo> info1 = TypeExtractor.getForClass(PrimitivePojo.class);
+
+		assertTrue(info1 instanceof PojoTypeInfo);
+	}
+
+	@Test
+	public void testUnderscorePojo() {
+		TypeInformation<UnderscorePojo> info1 = TypeExtractor.getForClass(UnderscorePojo.class);
+
+		assertTrue(info1 instanceof PojoTypeInfo);
+	}
+
 	public static final class TestPojo {
 		
 		public int someInt;
-		
+
 		private String aString;
 		
 		public Double[] doubleArray;
@@ -110,4 +124,30 @@ public class PojoTypeInfoTest {
 			return aString;
 		}
 	}
+
+	public static final class PrimitivePojo {
+
+		private int someInt;
+
+		public void setSomeInt(Integer someInt) {
+			this.someInt = someInt;
+		}
+
+		public Integer getSomeInt() {
+			return this.someInt;
+		}
+	}
+
+	public static final class UnderscorePojo {
+
+		private int some_int;
+
+		public void setSomeInt(int some_int) {
+			this.some_int = some_int;
+		}
+
+		public Integer getSomeInt() {
+			return this.some_int;
+		}
+	}
 }