You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/04/03 12:26:27 UTC

flink git commit: [FLINK-1803] [streaming] returns(..) method added to Stream operators

Repository: flink
Updated Branches:
  refs/heads/master 2d55cf0dc -> 189001d80


[FLINK-1803] [streaming] returns(..) method added to Stream operators

This closes #556


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/189001d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/189001d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/189001d8

Branch: refs/heads/master
Commit: 189001d8093c66b7d7e63f196aa71c9928021d50
Parents: 2d55cf0
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Apr 1 13:19:36 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri Apr 3 12:25:24 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |   7 --
 .../datastream/SingleOutputStreamOperator.java  | 125 +++++++++++++++++--
 .../temporaloperator/StreamCrossOperator.java   |   3 +-
 .../temporaloperator/StreamJoinOperator.java    |   3 +-
 .../api/scala/StreamCrossOperator.scala         |   7 +-
 .../api/scala/StreamJoinOperator.scala          |   5 +-
 6 files changed, 129 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/189001d8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 59ef108..cab6271 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -195,13 +195,6 @@ public class DataStream<OUT> {
 		return this.typeInfo;
 	}
 
-	@SuppressWarnings("unchecked")
-	public <R> DataStream<R> setType(TypeInformation<R> outType) {
-		streamGraph.setOutType(id, outType);
-		typeInfo = outType;
-		return (DataStream<R>) this;
-	}
-
 	public <F> F clean(F f) {
 		if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
 			ClosureCleaner.clean(f, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/189001d8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 8ffe1fc..0a3b29f 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -17,7 +17,10 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
@@ -54,13 +57,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 		}
 	}
 
-	@SuppressWarnings("unchecked")
-	public <R> SingleOutputStreamOperator<R, ?> setType(TypeInformation<R> outType) {
-		streamGraph.setOutType(id, outType);
-		typeInfo = outType;
-		return (SingleOutputStreamOperator<R, ?>) this;
-	}
-
 	/**
 	 * Sets the parallelism for this operator. The degree must be 1 or
 	 * more.
@@ -127,5 +123,120 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 		this.invokable.setChainingStrategy(strategy);
 		return this;
 	}
+	
+	/**
+	 * Adds a type information hint about the return type of this operator. 
+	 * 
+	 * <p>
+	 * Type hints are important in cases where the Java compiler
+	 * throws away generic type information necessary for efficient execution.
+	 * 
+	 * <p>
+	 * This method takes a type information string that will be parsed. A type information string can contain the following
+	 * types:
+	 *
+	 * <ul>
+	 * <li>Basic types such as <code>Integer</code>, <code>String</code>, etc.
+	 * <li>Basic type arrays such as <code>Integer[]</code>,
+	 * <code>String[]</code>, etc.
+	 * <li>Tuple types such as <code>Tuple1&lt;TYPE0&gt;</code>,
+	 * <code>Tuple2&lt;TYPE0, TYPE1&gt;</code>, etc.</li>
+	 * <li>Pojo types such as <code>org.my.MyPojo&lt;myFieldName=TYPE0,myFieldName2=TYPE1&gt;</code>, etc.</li>
+	 * <li>Generic types such as <code>java.lang.Class</code>, etc.
+	 * <li>Custom type arrays such as <code>org.my.CustomClass[]</code>,
+	 * <code>org.my.CustomClass$StaticInnerClass[]</code>, etc.
+	 * <li>Value types such as <code>DoubleValue</code>,
+	 * <code>StringValue</code>, <code>IntegerValue</code>, etc.</li>
+	 * <li>Tuple array types such as <code>Tuple2&lt;TYPE0,TYPE1&gt;[], etc.</code></li>
+	 * <li>Writable types such as <code>Writable&lt;org.my.CustomWritable&gt;</code></li>
+	 * <li>Enum types such as <code>Enum&lt;org.my.CustomEnum&gt;</code></li>
+	 * </ul>
+	 *
+	 * Example:
+	 * <code>"Tuple2&lt;String,Tuple2&lt;Integer,org.my.MyJob$Pojo&lt;word=String&gt;&gt;&gt;"</code>
+	 *
+	 * @param typeInfoString
+	 *            type information string to be parsed
+	 * @return This operator with a given return type hint.
+	 */
+	public O returns(String typeInfoString) {
+		if (typeInfoString == null) {
+			throw new IllegalArgumentException("Type information string must not be null.");
+		}
+		return returns(TypeInfoParser.<OUT>parse(typeInfoString));
+	}
+	
+	/**
+	 * Adds a type information hint about the return type of this operator. 
+	 * 
+	 * <p>
+	 * Type hints are important in cases where the Java compiler
+	 * throws away generic type information necessary for efficient execution.
+	 * 
+	 * <p>
+	 * This method takes an instance of {@link org.apache.flink.api.common.typeinfo.TypeInformation} such as:
+	 * 
+	 * <ul>
+	 * <li>{@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.java.typeutils.TupleTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.java.typeutils.PojoTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.java.typeutils.WritableTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.java.typeutils.ValueTypeInfo}</li>
+	 * <li>etc.</li>
+	 * </ul>
+	 *
+	 * @param typeInfo
+	 *            type information as a return type hint
+	 * @return This operator with a given return type hint.
+	 */
+	public O returns(TypeInformation<OUT> typeInfo) {
+		if (typeInfo == null) {
+			throw new IllegalArgumentException("Type information must not be null.");
+		}
+		streamGraph.setOutType(id, typeInfo);
+		this.typeInfo = typeInfo;
+		@SuppressWarnings("unchecked")
+		O returnType = (O) this;
+		return returnType;
+	}
+	
+	/**
+	 * Adds a type information hint about the return type of this operator. 
+	 * 
+	 * <p>
+	 * Type hints are important in cases where the Java compiler
+	 * throws away generic type information necessary for efficient execution.
+	 * 
+	 * <p>
+	 * This method takes a class that will be analyzed by Flink's type extraction capabilities.
+	 * 
+	 * <p>
+	 * Examples for classes are:
+	 * <ul>
+	 * <li>Basic types such as <code>Integer.class</code>, <code>String.class</code>, etc.</li>
+	 * <li>POJOs such as <code>MyPojo.class</code></li>
+	 * <li>Classes that <b>extend</b> tuples. Classes like <code>Tuple1.class</code>,<code>Tuple2.class</code>, etc. are <b>not</b> sufficient.</li>
+	 * <li>Arrays such as <code>String[].class</code>, etc.</li>
+	 * </ul>
+	 *
+	 * @param typeClass
+	 *            class as a return type hint
+	 * @return This operator with a given return type hint.
+	 */
+	@SuppressWarnings("unchecked")
+	public O returns(Class<OUT> typeClass) {
+		if (typeClass == null) {
+			throw new IllegalArgumentException("Type class must not be null.");
+		}
+		
+		try {
+			TypeInformation<OUT> ti = (TypeInformation<OUT>) TypeExtractor.createTypeInfo(typeClass);
+			return returns(ti);
+		}
+		catch (InvalidTypesException e) {
+			throw new InvalidTypesException("The given class is not suited for providing necessary type information.", e);
+		}
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/189001d8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
index 9af1648..20a0699 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
@@ -91,6 +91,7 @@ public class StreamCrossOperator<I1, I2> extends
 		 * @return The crossed data streams
 		 * 
 		 */
+		@SuppressWarnings("unchecked")
 		public <R> SingleOutputStreamOperator<R, ?> with(CrossFunction<I1, I2, R> function) {
 			TypeInformation<R> outTypeInfo = TypeExtractor.getCrossReturnTypes(function,
 					op.input1.getType(), op.input2.getType());
@@ -101,7 +102,7 @@ public class StreamCrossOperator<I1, I2> extends
 
 			streamGraph.setInvokable(id, invokable);
 
-			return setType(outTypeInfo);
+			return ((SingleOutputStreamOperator<R, ?>) this).returns(outTypeInfo);
 
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/189001d8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
index 6c53b6b..40c8462 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
@@ -236,6 +236,7 @@ public class StreamJoinOperator<I1, I2> extends
 		 * 
 		 * @return The joined data stream.
 		 */
+		@SuppressWarnings("unchecked")
 		public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2, OUT> joinFunction) {
 
 			TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction,
@@ -247,7 +248,7 @@ public class StreamJoinOperator<I1, I2> extends
 
 			streamGraph.setInvokable(id, invokable);
 
-			return setType(outType);
+			return ((SingleOutputStreamOperator<OUT, ?>) this).returns(outType);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/189001d8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index c53fc30..a6b8711 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.scala
 
 import org.apache.flink.api.common.ExecutionConfig
-
 import scala.reflect.ClassTag
 import org.apache.commons.lang.Validate
 import org.apache.flink.api.common.functions.CrossFunction
@@ -33,6 +32,7 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
 import java.util.concurrent.TimeUnit
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
 
 class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
   TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
@@ -89,8 +89,9 @@ object StreamCrossOperator {
 
       javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(),
         invokable)
-
-      javaStream.setType(implicitly[TypeInformation[R]])
+        
+      val js = javaStream.asInstanceOf[SingleOutputStreamOperator[R,_]]
+      js.returns(implicitly[TypeInformation[R]]).asInstanceOf[SingleOutputStreamOperator[R,_]]
     }
     
     override def every(length: Long, timeUnit: TimeUnit): CrossWindow[I1, I2] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/189001d8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index 5727add..bfdf493 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.scala
 
 import org.apache.flink.api.common.ExecutionConfig
-
 import scala.Array.canBuildFrom
 import scala.reflect.ClassTag
 import org.apache.commons.lang.Validate
@@ -37,6 +36,7 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.util.keys.KeySelectorUtil
 import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
 import java.util.concurrent.TimeUnit
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
 
 class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends 
 TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
@@ -202,7 +202,8 @@ object StreamJoinOperator {
       javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(),
         invokable)
 
-      javaStream.setType(implicitly[TypeInformation[R]])
+      val js = javaStream.asInstanceOf[SingleOutputStreamOperator[R,_]]
+      js.returns(implicitly[TypeInformation[R]]).asInstanceOf[SingleOutputStreamOperator[R,_]]
     }
   }