You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/21 15:40:04 UTC

[5/5] flink git commit: [FLINK-2071] [java api] Fix serializability issue with projectsion function.

[FLINK-2071] [java api] Fix serializability issue with projectsion function.

Improve type safety.
Minor cleanups in ProjectOperator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad1d9362
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad1d9362
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad1d9362

Branch: refs/heads/master
Commit: ad1d9362c88343972b19fdcce0b041d8380805a9
Parents: 6220f34
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 21 14:47:38 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 14:50:32 2015 +0200

----------------------------------------------------------------------
 .../api/java/operators/ProjectOperator.java     | 27 +++++++------
 .../translation/PlanProjectOperator.java        | 41 +++++++++++++-------
 2 files changed, 41 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ad1d9362/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index 9b7d567..55e182f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -82,7 +82,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 	 * Continues a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
 	 * <b>Note: Only Tuple DataSets can be projected using field indexes.</b></br>
 	 * The transformation projects each Tuple of the DataSet onto a (sub)set of fields.</br>
-	 * Additional fields can be added to the projection by calling {@link ProjectOperator#project(int[])}.
+	 * Additional fields can be added to the projection by calling this method repeatedly.
 	 *
 	 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
 	 *
@@ -94,8 +94,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 	 * @see DataSet
 	 * @see ProjectOperator
 	 */
-	@SuppressWarnings("hiding")
-	public <OUT extends Tuple> ProjectOperator<?, OUT> project(int... fieldIndexes) {
+	public <R extends Tuple> ProjectOperator<?, R> project(int... fieldIndexes) {
 		proj.acceptAdditionalIndexes(fieldIndexes);
 		
 		return proj.projectTupleX();
@@ -103,10 +102,10 @@ public class ProjectOperator<IN, OUT extends Tuple>
 	/**
 	 * Deprecated method only kept for compatibility.
 	 */
-	@SuppressWarnings({ "unchecked", "hiding" })
+	@SuppressWarnings("unchecked")
 	@Deprecated
-	public <OUT extends Tuple> ProjectOperator<IN, OUT> types(Class<?>... types) {
-		TupleTypeInfo<OUT> typeInfo = (TupleTypeInfo<OUT>)this.getResultType();
+	public <R extends Tuple> ProjectOperator<IN, R> types(Class<?>... types) {
+		TupleTypeInfo<R> typeInfo = (TupleTypeInfo<R>)this.getResultType();
 
 		if(types.length != typeInfo.getArity()) {
 			throw new InvalidProgramException("Provided types do not match projection.");
@@ -117,12 +116,12 @@ public class ProjectOperator<IN, OUT extends Tuple>
 				throw new InvalidProgramException("Provided type "+typeClass.getSimpleName()+" at position "+i+" does not match projection");
 			}
 		}
-		return (ProjectOperator<IN, OUT>) this;
+		return (ProjectOperator<IN, R>) this;
 	}
 	
 	public static class Projection<T> {
 		
-		private final DataSet<T> ds;		
+		private final DataSet<T> ds;
 		private int[] fieldIndexes;
 		
 		public Projection(DataSet<T> ds, int[] fieldIndexes) {
@@ -138,9 +137,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 						"project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
 			}
 			
-			int maxFieldIndex = ((TupleTypeInfo<?>)ds.getType()).getArity();
-			for(int i=0; i<fieldIndexes.length; i++) {
-				Preconditions.checkElementIndex(fieldIndexes[i], maxFieldIndex);
+			int maxFieldIndex = ds.getType().getArity();
+			for (int fieldIndexe : fieldIndexes) {
+				Preconditions.checkElementIndex(fieldIndexe, maxFieldIndex);
 			}
 			
 			this.ds = ds;
@@ -160,8 +159,8 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			
 			this.fieldIndexes = Arrays.copyOf(this.fieldIndexes, this.fieldIndexes.length + additionalIndexes.length);
 			
-			int maxFieldIndex = ((TupleTypeInfo<?>)ds.getType()).getArity();
-			for(int i=0; i<additionalIndexes.length; i++) {
+			int maxFieldIndex = ds.getType().getArity();
+			for (int i = 0; i < additionalIndexes.length; i++) {
 				Preconditions.checkElementIndex(additionalIndexes[i], maxFieldIndex);
 
 				this.fieldIndexes[offset + i] = additionalIndexes[i];
@@ -186,7 +185,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		 */
 		@SuppressWarnings("unchecked")
 		public <OUT extends Tuple> ProjectOperator<T, OUT> projectTupleX() {
-			ProjectOperator<T, OUT> projOperator = null;
+			ProjectOperator<T, OUT> projOperator;
 
 			switch (fieldIndexes.length) {
 			case 1: projOperator = (ProjectOperator<T, OUT>) projectTuple1(); break;

http://git-wip-us.apache.org/repos/asf/flink/blob/ad1d9362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
index 959b929..101b89b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
@@ -28,32 +28,47 @@ import org.apache.flink.api.java.tuple.Tuple;
 
 public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, MapFunction<T, R>> {
 
-	public PlanProjectOperator(int[] fields, String name, TypeInformation<T> inType, TypeInformation<R> outType, ExecutionConfig executionConfig) {
-		super(new MapProjector<T, R>(fields, outType.createSerializer(executionConfig).createInstance()), new UnaryOperatorInformation<T, R>(inType, outType), name);
+	public PlanProjectOperator(int[] fields, String name,
+								TypeInformation<T> inType, TypeInformation<R> outType,
+								ExecutionConfig executionConfig)
+	{
+		super(PlanProjectOperator.<T, R, Tuple>createTypedProjector(fields), new UnaryOperatorInformation<T, R>(inType, outType), name);
+	}
+	
+	@SuppressWarnings("unchecked")
+	private static <T, R extends Tuple, X extends Tuple> MapFunction<T, R> createTypedProjector(int[] fields) {
+		return (MapFunction<T, R>) new MapProjector<X, R>(fields);
 	}
 	
-	public static final class MapProjector<T, R extends Tuple>
-		extends AbstractRichFunction
-		implements MapFunction<T, R>
+	
+	public static final class MapProjector<T extends Tuple, R extends Tuple> 
+			extends AbstractRichFunction implements MapFunction<T, R>
 	{
 		private static final long serialVersionUID = 1L;
 		
 		private final int[] fields;
-		private final R outTuple;
+		private final Tuple outTuple;
 		
-		private MapProjector(int[] fields, R outTupleInstance) {
+		private MapProjector(int[] fields) {
 			this.fields = fields;
-			this.outTuple = outTupleInstance;
+			try {
+				this.outTuple = Tuple.getTupleClass(fields.length).newInstance();
+			}
+			catch (Exception e) {
+				// this should never happen
+				throw new RuntimeException(e);
+			}
 		}
 
 		// TODO We should use code generation for this.
+		@SuppressWarnings("unchecked")
 		@Override
-		public R map(T inTuple) throws Exception {
-			
-			for(int i=0; i<fields.length; i++) {
-				outTuple.setField(((Tuple)inTuple).getField(fields[i]), i);
+		public R map(Tuple inTuple) throws Exception {
+			for (int i = 0; i < fields.length; i++) {
+				outTuple.setField(inTuple.getField(fields[i]), i);
 			}
-			return outTuple;
+			
+			return (R) outTuple;
 		}
 	}
 }