You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/04/03 12:26:27 UTC
flink git commit: [FLINK-1803] [streaming] returns(..) method added
to Stream operators
Repository: flink
Updated Branches:
refs/heads/master 2d55cf0dc -> 189001d80
[FLINK-1803] [streaming] returns(..) method added to Stream operators
This closes #556
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/189001d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/189001d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/189001d8
Branch: refs/heads/master
Commit: 189001d8093c66b7d7e63f196aa71c9928021d50
Parents: 2d55cf0
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Apr 1 13:19:36 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri Apr 3 12:25:24 2015 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 7 --
.../datastream/SingleOutputStreamOperator.java | 125 +++++++++++++++++--
.../temporaloperator/StreamCrossOperator.java | 3 +-
.../temporaloperator/StreamJoinOperator.java | 3 +-
.../api/scala/StreamCrossOperator.scala | 7 +-
.../api/scala/StreamJoinOperator.scala | 5 +-
6 files changed, 129 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/189001d8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 59ef108..cab6271 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -195,13 +195,6 @@ public class DataStream<OUT> {
return this.typeInfo;
}
- @SuppressWarnings("unchecked")
- public <R> DataStream<R> setType(TypeInformation<R> outType) {
- streamGraph.setOutType(id, outType);
- typeInfo = outType;
- return (DataStream<R>) this;
- }
-
public <F> F clean(F f) {
if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
ClosureCleaner.clean(f, true);
http://git-wip-us.apache.org/repos/asf/flink/blob/189001d8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 8ffe1fc..0a3b29f 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -17,7 +17,10 @@
package org.apache.flink.streaming.api.datastream;
+import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
@@ -54,13 +57,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
}
}
- @SuppressWarnings("unchecked")
- public <R> SingleOutputStreamOperator<R, ?> setType(TypeInformation<R> outType) {
- streamGraph.setOutType(id, outType);
- typeInfo = outType;
- return (SingleOutputStreamOperator<R, ?>) this;
- }
-
/**
* Sets the parallelism for this operator. The degree must be 1 or
* more.
@@ -127,5 +123,120 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
this.invokable.setChainingStrategy(strategy);
return this;
}
+
+ /**
+ * Adds a type information hint about the return type of this operator.
+ *
+ * <p>
+ * Type hints are important in cases where the Java compiler
+ * throws away generic type information necessary for efficient execution.
+ *
+ * <p>
+ * This method takes a type information string that will be parsed. A type information string can contain the following
+ * types:
+ *
+ * <ul>
+ * <li>Basic types such as <code>Integer</code>, <code>String</code>, etc.
+ * <li>Basic type arrays such as <code>Integer[]</code>,
+ * <code>String[]</code>, etc.
+ * <li>Tuple types such as <code>Tuple1<TYPE0></code>,
+ * <code>Tuple2<TYPE0, TYPE1></code>, etc.</li>
+ * <li>Pojo types such as <code>org.my.MyPojo<myFieldName=TYPE0,myFieldName2=TYPE1></code>, etc.</li>
+ * <li>Generic types such as <code>java.lang.Class</code>, etc.
+ * <li>Custom type arrays such as <code>org.my.CustomClass[]</code>,
+ * <code>org.my.CustomClass$StaticInnerClass[]</code>, etc.
+ * <li>Value types such as <code>DoubleValue</code>,
+ * <code>StringValue</code>, <code>IntegerValue</code>, etc.</li>
+ * <li>Tuple array types such as <code>Tuple2<TYPE0,TYPE1>[], etc.</code></li>
+ * <li>Writable types such as <code>Writable<org.my.CustomWritable></code></li>
+ * <li>Enum types such as <code>Enum<org.my.CustomEnum></code></li>
+ * </ul>
+ *
+ * Example:
+ * <code>"Tuple2<String,Tuple2<Integer,org.my.MyJob$Pojo<word=String>>>"</code>
+ *
+ * @param typeInfoString
+ * type information string to be parsed
+ * @return This operator with a given return type hint.
+ */
+ public O returns(String typeInfoString) {
+ if (typeInfoString == null) {
+ throw new IllegalArgumentException("Type information string must not be null.");
+ }
+ return returns(TypeInfoParser.<OUT>parse(typeInfoString));
+ }
+
+ /**
+ * Adds a type information hint about the return type of this operator.
+ *
+ * <p>
+ * Type hints are important in cases where the Java compiler
+ * throws away generic type information necessary for efficient execution.
+ *
+ * <p>
+ * This method takes an instance of {@link org.apache.flink.api.common.typeinfo.TypeInformation} such as:
+ *
+ * <ul>
+ * <li>{@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}</li>
+ * <li>{@link org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}</li>
+ * <li>{@link org.apache.flink.api.java.typeutils.TupleTypeInfo}</li>
+ * <li>{@link org.apache.flink.api.java.typeutils.PojoTypeInfo}</li>
+ * <li>{@link org.apache.flink.api.java.typeutils.WritableTypeInfo}</li>
+ * <li>{@link org.apache.flink.api.java.typeutils.ValueTypeInfo}</li>
+ * <li>etc.</li>
+ * </ul>
+ *
+ * @param typeInfo
+ * type information as a return type hint
+ * @return This operator with a given return type hint.
+ */
+ public O returns(TypeInformation<OUT> typeInfo) {
+ if (typeInfo == null) {
+ throw new IllegalArgumentException("Type information must not be null.");
+ }
+ streamGraph.setOutType(id, typeInfo);
+ this.typeInfo = typeInfo;
+ @SuppressWarnings("unchecked")
+ O returnType = (O) this;
+ return returnType;
+ }
+
+ /**
+ * Adds a type information hint about the return type of this operator.
+ *
+ * <p>
+ * Type hints are important in cases where the Java compiler
+ * throws away generic type information necessary for efficient execution.
+ *
+ * <p>
+ * This method takes a class that will be analyzed by Flink's type extraction capabilities.
+ *
+ * <p>
+ * Examples for classes are:
+ * <ul>
+ * <li>Basic types such as <code>Integer.class</code>, <code>String.class</code>, etc.</li>
+ * <li>POJOs such as <code>MyPojo.class</code></li>
+ * <li>Classes that <b>extend</b> tuples. Classes like <code>Tuple1.class</code>,<code>Tuple2.class</code>, etc. are <b>not</b> sufficient.</li>
+ * <li>Arrays such as <code>String[].class</code>, etc.</li>
+ * </ul>
+ *
+ * @param typeClass
+ * class as a return type hint
+ * @return This operator with a given return type hint.
+ */
+ @SuppressWarnings("unchecked")
+ public O returns(Class<OUT> typeClass) {
+ if (typeClass == null) {
+ throw new IllegalArgumentException("Type class must not be null.");
+ }
+
+ try {
+ TypeInformation<OUT> ti = (TypeInformation<OUT>) TypeExtractor.createTypeInfo(typeClass);
+ return returns(ti);
+ }
+ catch (InvalidTypesException e) {
+ throw new InvalidTypesException("The given class is not suited for providing necessary type information.", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/189001d8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
index 9af1648..20a0699 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
@@ -91,6 +91,7 @@ public class StreamCrossOperator<I1, I2> extends
* @return The crossed data streams
*
*/
+ @SuppressWarnings("unchecked")
public <R> SingleOutputStreamOperator<R, ?> with(CrossFunction<I1, I2, R> function) {
TypeInformation<R> outTypeInfo = TypeExtractor.getCrossReturnTypes(function,
op.input1.getType(), op.input2.getType());
@@ -101,7 +102,7 @@ public class StreamCrossOperator<I1, I2> extends
streamGraph.setInvokable(id, invokable);
- return setType(outTypeInfo);
+ return ((SingleOutputStreamOperator<R, ?>) this).returns(outTypeInfo);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/189001d8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
index 6c53b6b..40c8462 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
@@ -236,6 +236,7 @@ public class StreamJoinOperator<I1, I2> extends
*
* @return The joined data stream.
*/
+ @SuppressWarnings("unchecked")
public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2, OUT> joinFunction) {
TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction,
@@ -247,7 +248,7 @@ public class StreamJoinOperator<I1, I2> extends
streamGraph.setInvokable(id, invokable);
- return setType(outType);
+ return ((SingleOutputStreamOperator<OUT, ?>) this).returns(outType);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/189001d8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index c53fc30..a6b8711 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.scala
import org.apache.flink.api.common.ExecutionConfig
-
import scala.reflect.ClassTag
import org.apache.commons.lang.Validate
import org.apache.flink.api.common.functions.CrossFunction
@@ -33,6 +32,7 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
import java.util.concurrent.TimeUnit
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
@@ -89,8 +89,9 @@ object StreamCrossOperator {
javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(),
invokable)
-
- javaStream.setType(implicitly[TypeInformation[R]])
+
+ val js = javaStream.asInstanceOf[SingleOutputStreamOperator[R,_]]
+ js.returns(implicitly[TypeInformation[R]]).asInstanceOf[SingleOutputStreamOperator[R,_]]
}
override def every(length: Long, timeUnit: TimeUnit): CrossWindow[I1, I2] = {
http://git-wip-us.apache.org/repos/asf/flink/blob/189001d8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index 5727add..bfdf493 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.scala
import org.apache.flink.api.common.ExecutionConfig
-
import scala.Array.canBuildFrom
import scala.reflect.ClassTag
import org.apache.commons.lang.Validate
@@ -37,6 +36,7 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
import org.apache.flink.streaming.util.keys.KeySelectorUtil
import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
import java.util.concurrent.TimeUnit
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
@@ -202,7 +202,8 @@ object StreamJoinOperator {
javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(),
invokable)
- javaStream.setType(implicitly[TypeInformation[R]])
+ val js = javaStream.asInstanceOf[SingleOutputStreamOperator[R,_]]
+ js.returns(implicitly[TypeInformation[R]]).asInstanceOf[SingleOutputStreamOperator[R,_]]
}
}