You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/22 16:04:59 UTC
flink git commit: [FLINK-1909] [streaming] Type handling refactor for
sources + scala api
Repository: flink
Updated Branches:
refs/heads/master bad77a365 -> 6df1dd2cc
[FLINK-1909] [streaming] Type handling refactor for sources + scala api
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6df1dd2c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6df1dd2c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6df1dd2c
Branch: refs/heads/master
Commit: 6df1dd2cc848d0a691a98309a3bb760f9a998673
Parents: bad77a3
Author: Gyula Fora <gy...@apache.org>
Authored: Sat Apr 18 18:31:21 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Apr 22 15:50:09 2015 +0200
----------------------------------------------------------------------
.../api/datastream/ConnectedDataStream.java | 28 ++++----
.../streaming/api/datastream/DataStream.java | 2 +-
.../api/datastream/DiscretizedStream.java | 2 +-
.../environment/StreamExecutionEnvironment.java | 67 +++++++-------------
.../flink/streaming/api/TypeFillTest.java | 21 ++++++
.../api/scala/ConnectedDataStream.scala | 45 +++++++------
.../flink/streaming/api/scala/DataStream.scala | 30 ++++-----
.../api/scala/StreamExecutionEnvironment.scala | 4 +-
8 files changed, 99 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6df1dd2c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index dd4a84b..46a4cfc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -123,7 +123,7 @@ public class ConnectedDataStream<IN1, IN2> {
*
* @return The type of the first input
*/
- public TypeInformation<IN1> getInputType1() {
+ public TypeInformation<IN1> getType1() {
return dataStream1.getType();
}
@@ -132,7 +132,7 @@ public class ConnectedDataStream<IN1, IN2> {
*
* @return The type of the second input
*/
- public TypeInformation<IN2> getInputType2() {
+ public TypeInformation<IN2> getType2() {
return dataStream2.getType();
}
@@ -244,10 +244,10 @@ public class ConnectedDataStream<IN1, IN2> {
public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper,
- CoMapFunction.class, false, true, getInputType1(), getInputType2(),
+ CoMapFunction.class, false, true, getType1(), getType2(),
Utils.getCallLocationName(), true);
- return addCoFunction("Co-Map", outTypeInfo, new CoStreamMap<IN1, IN2, OUT>(
+ return transform("Co-Map", outTypeInfo, new CoStreamMap<IN1, IN2, OUT>(
clean(coMapper)));
}
@@ -271,10 +271,10 @@ public class ConnectedDataStream<IN1, IN2> {
CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
- CoFlatMapFunction.class, false, true, getInputType1(), getInputType2(),
+ CoFlatMapFunction.class, false, true, getType1(), getType2(),
Utils.getCallLocationName(), true);
- return addCoFunction("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<IN1, IN2, OUT>(
+ return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<IN1, IN2, OUT>(
clean(coFlatMapper)));
}
@@ -297,10 +297,10 @@ public class ConnectedDataStream<IN1, IN2> {
public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coReducer,
- CoReduceFunction.class, false, true, getInputType1(), getInputType2(),
+ CoReduceFunction.class, false, true, getType1(), getType2(),
Utils.getCallLocationName(), true);
- return addCoFunction("Co-Reduce", outTypeInfo, getReduceOperator(clean(coReducer)));
+ return transform("Co-Reduce", outTypeInfo, getReduceOperator(clean(coReducer)));
}
@@ -365,10 +365,10 @@ public class ConnectedDataStream<IN1, IN2> {
}
TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coWindowFunction,
- CoWindowFunction.class, false, true, getInputType1(), getInputType2(),
+ CoWindowFunction.class, false, true, getType1(), getType2(),
Utils.getCallLocationName(), true);
- return addCoFunction("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
+ return transform("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
}
@@ -397,20 +397,20 @@ public class ConnectedDataStream<IN1, IN2> {
throw new IllegalArgumentException("Slide interval must be positive");
}
- return addCoFunction("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
+ return transform("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
}
- public <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
+ public <OUT> SingleOutputStreamOperator<OUT, ?> transform(String functionName,
TypeInformation<OUT> outTypeInfo, CoStreamOperator<IN1, IN2, OUT> operator) {
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
environment, functionName, outTypeInfo, operator);
- dataStream1.streamGraph.addCoOperator(returnStream.getId(), operator, getInputType1(),
- getInputType2(), outTypeInfo, functionName);
+ dataStream1.streamGraph.addCoOperator(returnStream.getId(), operator, getType1(),
+ getType2(), outTypeInfo, functionName);
dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);
http://git-wip-us.apache.org/repos/asf/flink/blob/6df1dd2c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 94aca48..f4d4965 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -93,7 +93,7 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
* <ul>
* <li>{@link DataStream#map},</li>
* <li>{@link DataStream#filter}, or</li>
- * <li>{@link DataStream#aggregate}.</li>
+ * <li>{@link DataStream#sum}.</li>
* </ul>
*
* @param <OUT>
http://git-wip-us.apache.org/repos/asf/flink/blob/6df1dd2c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index c6ee36d..53c35e0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -133,7 +133,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
return reduced.discretizedStream
.groupBy(new WindowKey<OUT>())
.connect(numOfParts.groupBy(0))
- .addCoFunction(
+ .transform(
"CoFlatMap",
reduced.discretizedStream.getType(),
new CoStreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>>(
http://git-wip-us.apache.org/repos/asf/flink/blob/6df1dd2c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 988fdfb..3e935f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -22,11 +22,10 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.List;
-import com.esotericsoftware.kryo.Serializer;
-
import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -34,6 +33,7 @@ import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.client.program.Client;
@@ -44,6 +44,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
import org.apache.flink.streaming.api.functions.source.FileReadFunction;
import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
@@ -53,11 +54,12 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
+import com.esotericsoftware.kryo.Serializer;
+
/**
* {@link ExecutionEnvironment} for streaming jobs. An instance of it is
* necessary to construct streaming topologies.
@@ -420,7 +422,7 @@ public abstract class StreamExecutionEnvironment {
public DataStream<String> readFileStream(String filePath, long intervalMillis,
WatchType watchType) {
DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
- filePath, intervalMillis, watchType), null, "File Stream");
+ filePath, intervalMillis, watchType), "File Stream");
return source.flatMap(new FileReadFunction());
}
@@ -448,7 +450,7 @@ public abstract class StreamExecutionEnvironment {
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
- return addSource(function, outTypeInfo, "Elements source");
+ return addSource(function, "Elements source").returns(outTypeInfo);
}
/**
@@ -475,7 +477,7 @@ public abstract class StreamExecutionEnvironment {
TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data.iterator().next());
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
- return addSource(function, outTypeInfo, "Collection Source");
+ return addSource(function, "Collection Source").returns(outTypeInfo);
}
/**
@@ -508,7 +510,7 @@ public abstract class StreamExecutionEnvironment {
*/
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter,
long maxRetry) {
- return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), null,
+ return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
"Socket Stream");
}
@@ -560,13 +562,13 @@ public abstract class StreamExecutionEnvironment {
if (from > to) {
throw new IllegalArgumentException("Start of sequence must not be greater than the end");
}
- return addSource(new GenSequenceFunction(from, to), null, "Sequence Source");
+ return addSource(new GenSequenceFunction(from, to), "Sequence Source");
}
private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
TypeInformation<String> typeInfo) {
FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
- DataStreamSource<String> returnStream = addSource(function, null, "File Source");
+ DataStreamSource<String> returnStream = addSource(function, "File Source");
streamGraph.setInputFormat(returnStream.getId(), inputFormat);
return returnStream;
}
@@ -588,31 +590,7 @@ public abstract class StreamExecutionEnvironment {
* @return the data stream constructed
*/
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
- return addSource(function, null);
- }
-
- /**
- * Ads a data source with a custom type information thus opening a
- * {@link DataStream}. Only in very special cases does the user need to
- * support type information. Otherwise use
- * {@link #addSource(SourceFunction)} </p> By default sources have a
- * parallelism of 1. To enable parallel execution, the user defined source
- * should implement {@link ParallelSourceFunction} or extend
- * {@link RichParallelSourceFunction}. In these cases the resulting source
- * will have the parallelism of the environment. To change this afterwards
- * call {@link DataStreamSource#setParallelism(int)}
- *
- * @param function
- * the user defined function
- * @param outTypeInfo
- * the user defined type information for the stream
- * @param <OUT>
- * type of the returned stream
- * @return the data stream constructed
- */
- public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
- TypeInformation<OUT> outTypeInfo) {
- return addSource(function, outTypeInfo, "Custom Source");
+ return addSource(function, "Custom source");
}
/**
@@ -623,8 +601,6 @@ public abstract class StreamExecutionEnvironment {
*
* @param function
* the user defined function
- * @param outTypeInfo
- * the user defined type information for the stream
* @param sourceName
* Name of the data source
* @param <OUT>
@@ -632,15 +608,18 @@ public abstract class StreamExecutionEnvironment {
* @return the data stream constructed
*/
@SuppressWarnings("unchecked")
- private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
- TypeInformation<OUT> outTypeInfo, String sourceName) {
+ private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
+
+ TypeInformation<OUT> outTypeInfo;
- if (outTypeInfo == null) {
- if (function instanceof GenericSourceFunction) {
- outTypeInfo = ((GenericSourceFunction<OUT>) function).getType();
- } else {
+ if (function instanceof GenericSourceFunction) {
+ outTypeInfo = ((GenericSourceFunction<OUT>) function).getType();
+ } else {
+ try {
outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class,
function.getClass(), 0, null, null);
+ } catch (InvalidTypesException e) {
+ outTypeInfo = (TypeInformation<OUT>) new MissingTypeInfo("Custom source", e);
}
}
@@ -649,8 +628,8 @@ public abstract class StreamExecutionEnvironment {
ClosureCleaner.clean(function, true);
StreamOperator<OUT, OUT> sourceOperator = new StreamSource<OUT>(function);
- return new DataStreamSource<OUT>(this, sourceName, outTypeInfo, sourceOperator,
- isParallel, sourceName);
+ return new DataStreamSource<OUT>(this, sourceName, outTypeInfo, sourceOperator, isParallel,
+ sourceName);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6df1dd2c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index f163b9e..35cbaba 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.junit.Test;
@@ -42,6 +43,12 @@ public class TypeFillTest {
public void test() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ try {
+ env.addSource(new TestSource<Integer>()).print();
+ fail();
+ } catch (Exception e) {
+ }
+
DataStream<Long> source = env.generateSequence(1, 10);
try {
@@ -76,6 +83,7 @@ public class TypeFillTest {
} catch (Exception e) {
}
+ env.addSource(new TestSource<Integer>()).returns("Integer");
source.map(new TestMap<Long, Long>()).returns(Long.class).print();
source.flatMap(new TestFlatMap<Long, Long>()).returns("Long").print();
source.connect(source).map(new TestCoMap<Long, Long, Integer>()).returns("Integer").print();
@@ -106,6 +114,19 @@ public class TypeFillTest {
}
+ private class TestSource<T> implements SourceFunction<T> {
+
+ @Override
+ public void run(Collector<T> collector) throws Exception {
+
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ }
+
private class TestMap<T, O> implements MapFunction<T, O> {
@Override
public O map(T value) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/6df1dd2c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
index fbd7a02..47d8fd2 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala
import java.util
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream}
+import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream, DataStream => JavaStream}
import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
import org.apache.flink.util.Collector
@@ -54,8 +54,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
def map2(in2: IN2): R = clean(fun2)(in2)
}
- new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]],
- new CoStreamMap[IN1, IN2, R](comapper)))
+ map(comapper)
}
/**
@@ -78,8 +77,8 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
throw new NullPointerException("Map function must not be null.")
}
- new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]],
- new CoStreamMap[IN1, IN2, R](coMapper)))
+ val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+ javaStream.map(coMapper).returns(outType).asInstanceOf[JavaStream[R]]
}
/**
@@ -102,8 +101,9 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
if (coFlatMapper == null) {
throw new NullPointerException("FlatMap function must not be null.")
}
- new DataStream[R](javaStream.addCoFunction("flatMap", implicitly[TypeInformation[R]],
- new CoStreamFlatMap[IN1, IN2, R](coFlatMapper)))
+
+ val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+ javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]]
}
/**
@@ -235,13 +235,13 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* The function used for grouping the second input
* @return @return The transformed { @link ConnectedDataStream}
*/
- def groupBy[K: TypeInformation](fun1: IN1 => _, fun2: IN2 => _):
+ def groupBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
ConnectedDataStream[IN1, IN2] = {
- val keyExtractor1 = new KeySelector[IN1, Any] {
+ val keyExtractor1 = new KeySelector[IN1, K] {
def getKey(in: IN1) = clean(fun1)(in)
}
- val keyExtractor2 = new KeySelector[IN2, Any] {
+ val keyExtractor2 = new KeySelector[IN2, L] {
def getKey(in: IN2) = clean(fun2)(in)
}
@@ -267,9 +267,9 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
if (coReducer == null) {
throw new NullPointerException("Reduce function must not be null.")
}
-
- new DataStream[R](javaStream.addCoFunction("coReduce", implicitly[TypeInformation[R]],
- new CoStreamReduce[IN1, IN2, R](coReducer)))
+
+ val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+ javaStream.reduce(coReducer).returns(outType).asInstanceOf[JavaStream[R]]
}
/**
@@ -325,12 +325,16 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* @return The transformed { @link DataStream}.
*/
def windowReduce[R: TypeInformation: ClassTag](coWindowFunction:
- CoWindowFunction[IN1, IN2, R], windowSize: Long, slideInterval: Long) = {
+ CoWindowFunction[IN1, IN2, R], windowSize: Long, slideInterval: Long):
+ DataStream[R] = {
if (coWindowFunction == null) {
throw new NullPointerException("CoWindow function must no be null")
}
-
- javaStream.windowReduce(coWindowFunction, windowSize, slideInterval)
+
+ val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+
+ javaStream.windowReduce(coWindowFunction, windowSize, slideInterval).
+ returns(outType).asInstanceOf[JavaStream[R]]
}
/**
@@ -351,7 +355,8 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* @return The transformed { @link DataStream}.
*/
def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], Seq[IN2],
- Collector[R]) => Unit, windowSize: Long, slideInterval: Long) = {
+ Collector[R]) => Unit, windowSize: Long, slideInterval: Long):
+ DataStream[R] = {
if (coWindower == null) {
throw new NullPointerException("CoWindow function must no be null")
}
@@ -361,7 +366,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
out: Collector[R]): Unit = clean(coWindower)(first, second, out)
}
- javaStream.windowReduce(coWindowFun, windowSize, slideInterval)
+ windowReduce(coWindowFun, windowSize, slideInterval)
}
/**
@@ -388,7 +393,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* @return The type of the first input
*/
def getInputType1(): TypeInformation[IN1] = {
- javaStream.getInputType1
+ javaStream.getType1
}
/**
@@ -397,7 +402,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* @return The type of the second input
*/
def getInputType2(): TypeInformation[IN2] = {
- javaStream.getInputType2
+ javaStream.getType2
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6df1dd2c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 5a4b611..4ccb073 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -365,8 +365,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
val cleanFun = clean(fun)
def map(in: T): R = cleanFun(in)
}
-
- javaStream.transform("map", implicitly[TypeInformation[R]], new StreamMap[T, R](mapper))
+
+ map(mapper)
}
/**
@@ -377,7 +377,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
throw new NullPointerException("Map function must not be null.")
}
- javaStream.transform("map", implicitly[TypeInformation[R]], new StreamMap[T, R](mapper))
+ val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+ javaStream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]]
}
/**
@@ -388,8 +389,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
if (flatMapper == null) {
throw new NullPointerException("FlatMap function must not be null.")
}
- javaStream.transform("flatMap", implicitly[TypeInformation[R]],
- new StreamFlatMap[T, R](flatMapper))
+
+ val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+ javaStream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]]
}
/**
@@ -430,12 +432,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
if (reducer == null) {
throw new NullPointerException("Reduce function must not be null.")
}
- javaStream match {
- case ds: GroupedDataStream[_] => javaStream.transform("reduce",
- javaStream.getType(), new StreamGroupedReduce[T](reducer, ds.getKeySelector()))
- case _ => javaStream.transform("reduce", javaStream.getType(),
- new StreamReduce[T](reducer))
- }
+
+ javaStream.reduce(reducer)
}
/**
@@ -462,13 +460,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
if (folder == null) {
throw new NullPointerException("Fold function must not be null.")
}
- javaStream match {
- case ds: GroupedDataStream[_] => javaStream.transform("fold",
- implicitly[TypeInformation[R]], new StreamGroupedFold[T,R](folder, ds.getKeySelector(),
- initialValue, implicitly[TypeInformation[R]]))
- case _ => javaStream.transform("fold", implicitly[TypeInformation[R]],
- new StreamFold[T,R](folder, initialValue, implicitly[TypeInformation[R]]))
- }
+
+ val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+ javaStream.fold(initialValue, folder).returns(outType).asInstanceOf[JavaStream[R]]
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/6df1dd2c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 0217793..c7716ed 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -261,7 +261,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions
.asJavaCollection(data))
- javaEnv.addSource(sourceFunction, typeInfo)
+ javaEnv.addSource(sourceFunction).returns(typeInfo)
}
/**
@@ -277,7 +277,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
Validate.notNull(function, "Function must not be null.")
val cleanFun = StreamExecutionEnvironment.clean(function)
val typeInfo = implicitly[TypeInformation[T]]
- javaEnv.addSource(cleanFun, typeInfo)
+ javaEnv.addSource(cleanFun).returns(typeInfo)
}
/**