You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/21 15:40:04 UTC
[5/5] flink git commit: [FLINK-2071] [java api] Fix serializability
issue with projectsion function.
[FLINK-2071] [java api] Fix serializability issue with projectsion function.
Improve type safety.
Minor cleanups in ProjectOperator.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad1d9362
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad1d9362
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad1d9362
Branch: refs/heads/master
Commit: ad1d9362c88343972b19fdcce0b041d8380805a9
Parents: 6220f34
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 21 14:47:38 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 14:50:32 2015 +0200
----------------------------------------------------------------------
.../api/java/operators/ProjectOperator.java | 27 +++++++------
.../translation/PlanProjectOperator.java | 41 +++++++++++++-------
2 files changed, 41 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ad1d9362/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index 9b7d567..55e182f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -82,7 +82,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
* Continues a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
* <b>Note: Only Tuple DataSets can be projected using field indexes.</b></br>
* The transformation projects each Tuple of the DataSet onto a (sub)set of fields.</br>
- * Additional fields can be added to the projection by calling {@link ProjectOperator#project(int[])}.
+ * Additional fields can be added to the projection by calling this method repeatedly.
*
* <b>Note: With the current implementation, the Project transformation looses type information.</b>
*
@@ -94,8 +94,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
* @see DataSet
* @see ProjectOperator
*/
- @SuppressWarnings("hiding")
- public <OUT extends Tuple> ProjectOperator<?, OUT> project(int... fieldIndexes) {
+ public <R extends Tuple> ProjectOperator<?, R> project(int... fieldIndexes) {
proj.acceptAdditionalIndexes(fieldIndexes);
return proj.projectTupleX();
@@ -103,10 +102,10 @@ public class ProjectOperator<IN, OUT extends Tuple>
/**
* Deprecated method only kept for compatibility.
*/
- @SuppressWarnings({ "unchecked", "hiding" })
+ @SuppressWarnings("unchecked")
@Deprecated
- public <OUT extends Tuple> ProjectOperator<IN, OUT> types(Class<?>... types) {
- TupleTypeInfo<OUT> typeInfo = (TupleTypeInfo<OUT>)this.getResultType();
+ public <R extends Tuple> ProjectOperator<IN, R> types(Class<?>... types) {
+ TupleTypeInfo<R> typeInfo = (TupleTypeInfo<R>)this.getResultType();
if(types.length != typeInfo.getArity()) {
throw new InvalidProgramException("Provided types do not match projection.");
@@ -117,12 +116,12 @@ public class ProjectOperator<IN, OUT extends Tuple>
throw new InvalidProgramException("Provided type "+typeClass.getSimpleName()+" at position "+i+" does not match projection");
}
}
- return (ProjectOperator<IN, OUT>) this;
+ return (ProjectOperator<IN, R>) this;
}
public static class Projection<T> {
- private final DataSet<T> ds;
+ private final DataSet<T> ds;
private int[] fieldIndexes;
public Projection(DataSet<T> ds, int[] fieldIndexes) {
@@ -138,9 +137,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
"project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
}
- int maxFieldIndex = ((TupleTypeInfo<?>)ds.getType()).getArity();
- for(int i=0; i<fieldIndexes.length; i++) {
- Preconditions.checkElementIndex(fieldIndexes[i], maxFieldIndex);
+ int maxFieldIndex = ds.getType().getArity();
+ for (int fieldIndexe : fieldIndexes) {
+ Preconditions.checkElementIndex(fieldIndexe, maxFieldIndex);
}
this.ds = ds;
@@ -160,8 +159,8 @@ public class ProjectOperator<IN, OUT extends Tuple>
this.fieldIndexes = Arrays.copyOf(this.fieldIndexes, this.fieldIndexes.length + additionalIndexes.length);
- int maxFieldIndex = ((TupleTypeInfo<?>)ds.getType()).getArity();
- for(int i=0; i<additionalIndexes.length; i++) {
+ int maxFieldIndex = ds.getType().getArity();
+ for (int i = 0; i < additionalIndexes.length; i++) {
Preconditions.checkElementIndex(additionalIndexes[i], maxFieldIndex);
this.fieldIndexes[offset + i] = additionalIndexes[i];
@@ -186,7 +185,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
*/
@SuppressWarnings("unchecked")
public <OUT extends Tuple> ProjectOperator<T, OUT> projectTupleX() {
- ProjectOperator<T, OUT> projOperator = null;
+ ProjectOperator<T, OUT> projOperator;
switch (fieldIndexes.length) {
case 1: projOperator = (ProjectOperator<T, OUT>) projectTuple1(); break;
http://git-wip-us.apache.org/repos/asf/flink/blob/ad1d9362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
index 959b929..101b89b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
@@ -28,32 +28,47 @@ import org.apache.flink.api.java.tuple.Tuple;
public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, MapFunction<T, R>> {
- public PlanProjectOperator(int[] fields, String name, TypeInformation<T> inType, TypeInformation<R> outType, ExecutionConfig executionConfig) {
- super(new MapProjector<T, R>(fields, outType.createSerializer(executionConfig).createInstance()), new UnaryOperatorInformation<T, R>(inType, outType), name);
+ public PlanProjectOperator(int[] fields, String name,
+ TypeInformation<T> inType, TypeInformation<R> outType,
+ ExecutionConfig executionConfig)
+ {
+ super(PlanProjectOperator.<T, R, Tuple>createTypedProjector(fields), new UnaryOperatorInformation<T, R>(inType, outType), name);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T, R extends Tuple, X extends Tuple> MapFunction<T, R> createTypedProjector(int[] fields) {
+ return (MapFunction<T, R>) new MapProjector<X, R>(fields);
}
- public static final class MapProjector<T, R extends Tuple>
- extends AbstractRichFunction
- implements MapFunction<T, R>
+
+ public static final class MapProjector<T extends Tuple, R extends Tuple>
+ extends AbstractRichFunction implements MapFunction<T, R>
{
private static final long serialVersionUID = 1L;
private final int[] fields;
- private final R outTuple;
+ private final Tuple outTuple;
- private MapProjector(int[] fields, R outTupleInstance) {
+ private MapProjector(int[] fields) {
this.fields = fields;
- this.outTuple = outTupleInstance;
+ try {
+ this.outTuple = Tuple.getTupleClass(fields.length).newInstance();
+ }
+ catch (Exception e) {
+ // this should never happen
+ throw new RuntimeException(e);
+ }
}
// TODO We should use code generation for this.
+ @SuppressWarnings("unchecked")
@Override
- public R map(T inTuple) throws Exception {
-
- for(int i=0; i<fields.length; i++) {
- outTuple.setField(((Tuple)inTuple).getField(fields[i]), i);
+ public R map(Tuple inTuple) throws Exception {
+ for (int i = 0; i < fields.length; i++) {
+ outTuple.setField(inTuple.getField(fields[i]), i);
}
- return outTuple;
+
+ return (R) outTuple;
}
}
}