You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/04/05 10:53:06 UTC
[1/3] flink git commit: [FLINK-1808] [streaming] Send barrier
requests only when the execution graph is running
Repository: flink
Updated Branches:
refs/heads/master f36eb54ee -> 1da4b6437
[FLINK-1808] [streaming] Send barrier requests only when the execution graph is running
Closes #551
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1da4b643
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1da4b643
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1da4b643
Branch: refs/heads/master
Commit: 1da4b6437e33f794dd3603f029010f0cc70607d1
Parents: 1cf49e9
Author: Paris Carbone <se...@gmail.com>
Authored: Tue Mar 31 13:51:07 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sun Apr 5 10:51:20 2015 +0200
----------------------------------------------------------------------
.../runtime/jobmanager/StreamCheckpointCoordinator.scala | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1da4b643/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
index d9a3421..f42d08ab 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
@@ -23,7 +23,6 @@ import java.lang.Long
import akka.actor._
import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.ActorLogMessages
-import org.apache.flink.runtime.execution.ExecutionState.RUNNING
import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex}
import org.apache.flink.runtime.jobgraph.JobStatus._
import org.apache.flink.runtime.jobgraph.JobVertexID
@@ -84,13 +83,16 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
case FAILED | CANCELED | FINISHED =>
log.info("Stopping monitor for terminated job {}", executionGraph.getJobID)
self ! PoisonPill
- case _ =>
+ case RUNNING =>
curId += 1
log.debug("Sending Barrier to vertices of Job " + executionGraph.getJobName)
vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex &&
v.getExecutionState == RUNNING).foreach(vertex
=> vertex.getCurrentAssignedResource.getInstance.getTaskManager
! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
+ case _ =>
+ log.debug("Omitting sending barrier since graph is in {} state for job {}",
+ executionGraph.getState, executionGraph.getJobID)
}
case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) =>
@@ -112,7 +114,7 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
ackId = if(!keysToKeep.isEmpty) keysToKeep.max else ackId
acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId)))
states = states.filterKeys(_._3 >= ackId)
- log.debug("[FT-MONITOR] Last global barrier is " + ackId)
+ log.debug("Last global barrier is " + ackId)
executionGraph.loadOperatorStates(states)
}
[3/3] flink git commit: Fix the constructor comments with correct
parameter in DefaultMemoryManager
Posted by mb...@apache.org.
Fix the constructor comments with correct parameter in DefaultMemoryManager
Closes #568
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/909cee28
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/909cee28
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/909cee28
Branch: refs/heads/master
Commit: 909cee281292be7408b1b9f0c25cfbd6cf185b4a
Parents: f36eb54
Author: hongsibao <ho...@huawei.com>
Authored: Sat Apr 4 10:28:59 2015 +0800
Committer: mbalassi <mb...@apache.org>
Committed: Sun Apr 5 10:51:20 2015 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/memorymanager/DefaultMemoryManager.java | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/909cee28/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
index 5f84b23..cd677ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
@@ -84,6 +84,7 @@ public class DefaultMemoryManager implements MemoryManager {
* Creates a memory manager with the given capacity, using the default page size.
*
* @param memorySize The total size of the memory to be managed by this memory manager.
+ * @param numberOfSlots The number of slots of the task manager.
*/
public DefaultMemoryManager(long memorySize, int numberOfSlots) {
this(memorySize, numberOfSlots, DEFAULT_PAGE_SIZE);
@@ -93,6 +94,7 @@ public class DefaultMemoryManager implements MemoryManager {
* Creates a memory manager with the given capacity and given page size.
*
* @param memorySize The total size of the memory to be managed by this memory manager.
+ * @param numberOfSlots The number of slots of the task manager.
* @param pageSize The size of the pages handed out by the memory manager.
*/
public DefaultMemoryManager(long memorySize, int numberOfSlots, int pageSize) {
[2/3] flink git commit: [FLINK-1824] [streaming] Support added for
missing types in DataStream api
Posted by mb...@apache.org.
[FLINK-1824] [streaming] Support added for missing types in DataStream api
Closes #567
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1cf49e90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1cf49e90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1cf49e90
Branch: refs/heads/master
Commit: 1cf49e90cddea610b4cfa3623a53ced66aadfc2d
Parents: 909cee2
Author: Gyula Fora <gy...@apache.org>
Authored: Fri Apr 3 20:03:58 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sun Apr 5 10:51:20 2015 +0200
----------------------------------------------------------------------
.../flink/api/java/typeutils/TypeExtractor.java | 2 +-
.../apache/flink/streaming/api/StreamGraph.java | 17 +-
.../api/datastream/ConnectedDataStream.java | 30 +--
.../streaming/api/datastream/DataStream.java | 90 ++++++---
.../api/datastream/GroupedDataStream.java | 28 +--
.../datastream/SingleOutputStreamOperator.java | 3 +-
.../flink/streaming/api/TypeFillTest.java | 181 +++++++++++++++++++
7 files changed, 295 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf49e90/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index ae6063a..0fec67f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -272,7 +272,7 @@ public class TypeExtractor {
}
@SuppressWarnings("unchecked")
- private static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(Function function, Class<?> baseClass,
+ public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(Function function, Class<?> baseClass,
boolean hasIterables, boolean hasCollector, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type,
String functionName, boolean allowMissing)
{
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf49e90/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 970ce49..351dec9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -34,12 +34,13 @@ import java.util.Set;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -169,7 +170,9 @@ public class StreamGraph extends StreamingPlan {
StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
inTypeInfo, executionConfig) : null;
- StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>(
+
+ StreamRecordSerializer<OUT> outSerializer = (outTypeInfo != null)
+ && !(outTypeInfo instanceof MissingTypeInfo) ? new StreamRecordSerializer<OUT>(
outTypeInfo, executionConfig) : null;
addTypeSerializers(vertexID, inSerializer, null, outSerializer, null);
@@ -215,7 +218,8 @@ public class StreamGraph extends StreamingPlan {
setSerializersFrom(iterationHead, vertexID);
int outpartitionerIndexToCopy = edges.getInEdgeIndices(iterationHead).get(0);
- StreamPartitioner<?> outputPartitioner = edges.getOutEdges(outpartitionerIndexToCopy).get(0).getPartitioner();
+ StreamPartitioner<?> outputPartitioner = edges.getOutEdges(outpartitionerIndexToCopy)
+ .get(0).getPartitioner();
setEdge(vertexID, iterationHead, outputPartitioner, 0, new ArrayList<String>());
@@ -273,9 +277,12 @@ public class StreamGraph extends StreamingPlan {
addVertex(vertexID, CoStreamVertex.class, taskInvokableObject, operatorName, parallelism);
+ StreamRecordSerializer<OUT> outSerializer = (outTypeInfo != null)
+ && !(outTypeInfo instanceof MissingTypeInfo) ? new StreamRecordSerializer<OUT>(
+ outTypeInfo, executionConfig) : null;
+
addTypeSerializers(vertexID, new StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig),
- new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig),
- new StreamRecordSerializer<OUT>(outTypeInfo, executionConfig), null);
+ new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig), outSerializer, null);
if (LOG.isDebugEnabled()) {
LOG.debug("CO-TASK: {}", vertexID);
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf49e90/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index a9ae77a..e867359 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.StreamGraph;
@@ -241,8 +242,10 @@ public class ConnectedDataStream<IN1, IN2> {
* @return The transformed {@link DataStream}
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
- TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoMapFunction.class,
- coMapper.getClass(), 2, null, null);
+
+ TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper,
+ CoMapFunction.class, false, true, getInputType1(), getInputType2(),
+ Utils.getCallLocationName(), true);
return addCoFunction("Co-Map", outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>(
clean(coMapper)));
@@ -266,8 +269,10 @@ public class ConnectedDataStream<IN1, IN2> {
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
- TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoFlatMapFunction.class,
- coFlatMapper.getClass(), 2, null, null);
+
+ TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
+ CoFlatMapFunction.class, false, true, getInputType1(), getInputType2(),
+ Utils.getCallLocationName(), true);
return addCoFunction("Co-Flat Map", outTypeInfo, new CoFlatMapInvokable<IN1, IN2, OUT>(
clean(coFlatMapper)));
@@ -291,8 +296,9 @@ public class ConnectedDataStream<IN1, IN2> {
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
- TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoReduceFunction.class,
- coReducer.getClass(), 2, null, null);
+ TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coReducer,
+ CoReduceFunction.class, false, true, getInputType1(), getInputType2(),
+ Utils.getCallLocationName(), true);
return addCoFunction("Co-Reduce", outTypeInfo, getReduceInvokable(clean(coReducer)));
@@ -357,9 +363,10 @@ public class ConnectedDataStream<IN1, IN2> {
if (slideInterval < 1) {
throw new IllegalArgumentException("Slide interval must be positive");
}
-
- TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoWindowFunction.class,
- coWindowFunction.getClass(), 2, null, null);
+
+ TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coWindowFunction,
+ CoWindowFunction.class, false, true, getInputType1(), getInputType2(),
+ Utils.getCallLocationName(), true);
return addCoFunction("Co-Window", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>(
clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
@@ -402,9 +409,8 @@ public class ConnectedDataStream<IN1, IN2> {
SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
environment, functionName, outTypeInfo, functionInvokable);
- dataStream1.streamGraph.addCoTask(returnStream.getId(), functionInvokable,
- getInputType1(), getInputType2(), outTypeInfo, functionName,
- environment.getParallelism());
+ dataStream1.streamGraph.addCoTask(returnStream.getId(), functionInvokable, getInputType1(),
+ getInputType2(), outTypeInfo, functionName, environment.getParallelism());
dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf49e90/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index cab6271..27c831f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
@@ -38,11 +39,13 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileSystem.WriteMode;
@@ -102,7 +105,6 @@ public class DataStream<OUT> {
protected static Integer counter = 0;
protected final StreamExecutionEnvironment environment;
protected final Integer id;
- protected final String type;
protected int parallelism;
protected List<String> userDefinedNames;
protected StreamPartitioner<OUT> partitioner;
@@ -111,6 +113,7 @@ public class DataStream<OUT> {
protected List<DataStream<OUT>> mergedStreams;
protected final StreamGraph streamGraph;
+ private boolean typeUsed;
/**
* Create a new {@link DataStream} in the given execution environment with
@@ -131,7 +134,6 @@ public class DataStream<OUT> {
counter++;
this.id = counter;
- this.type = operatorType;
this.environment = environment;
this.parallelism = environment.getParallelism();
this.streamGraph = environment.getStreamGraph();
@@ -151,7 +153,6 @@ public class DataStream<OUT> {
public DataStream(DataStream<OUT> dataStream) {
this.environment = dataStream.environment;
this.id = dataStream.id;
- this.type = dataStream.type;
this.parallelism = dataStream.parallelism;
this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames);
this.partitioner = dataStream.partitioner;
@@ -192,9 +193,45 @@ public class DataStream<OUT> {
*/
@SuppressWarnings("unchecked")
public TypeInformation<OUT> getType() {
+ if (typeInfo instanceof MissingTypeInfo) {
+ MissingTypeInfo typeInfo = (MissingTypeInfo) this.typeInfo;
+ throw new InvalidTypesException(
+ "The return type of function '"
+ + typeInfo.getFunctionName()
+ + "' could not be determined automatically, due to type erasure. "
+ + "You can give type information hints by using the returns(...) method on the result of "
+ + "the transformation call, or by letting your function implement the 'ResultTypeQueryable' "
+ + "interface.", typeInfo.getTypeException());
+ }
+ typeUsed = true;
return this.typeInfo;
}
+ /**
+ * Tries to fill in the type information. Type information can be filled in
+ * later when the program uses a type hint. This method checks whether the
+ * type information has ever been accessed before and does not allow
+ * modifications if the type was accessed already. This ensures consistency
+ * by making sure different parts of the operation do not assume different
+ * type information.
+ *
+ * @param typeInfo
+ * The type information to fill in.
+ *
+ * @throws IllegalStateException
+ * Thrown, if the type information has been accessed before.
+ */
+ protected void fillInType(TypeInformation<OUT> typeInfo) {
+ if (typeUsed) {
+ throw new IllegalStateException(
+ "TypeInformation cannot be filled in for the type after it has been used. "
+ + "Please make sure that the type info hints are the first call after the transformation function, "
+ + "before any access to types or semantic properties, etc.");
+ }
+ streamGraph.setOutType(id, typeInfo);
+ this.typeInfo = typeInfo;
+ }
+
public <F> F clean(F f) {
if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
ClosureCleaner.clean(f, true);
@@ -234,8 +271,8 @@ public class DataStream<OUT> {
/**
* Operator used for directing tuples to specific named outputs using an
- * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}. Calling
- * this method on an operator creates a new {@link SplitDataStream}.
+ * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}.
+ * Calling this method on an operator creates a new {@link SplitDataStream}.
*
* @param outputSelector
* The user defined
@@ -471,7 +508,8 @@ public class DataStream<OUT> {
*/
public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<OUT, R> mapper) {
- TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType());
+ TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
+ Utils.getCallLocationName(), true);
return transform("Map", outType, new MapInvokable<OUT, R>(clean(mapper)));
}
@@ -495,7 +533,7 @@ public class DataStream<OUT> {
public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<OUT, R> flatMapper) {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
- getType());
+ getType(), Utils.getCallLocationName(), true);
return transform("Flat Map", outType, new FlatMapInvokable<OUT, R>(clean(flatMapper)));
@@ -521,20 +559,22 @@ public class DataStream<OUT> {
/**
* Applies a fold transformation on the data stream. The returned stream
- * contains all the intermediate values of the fold transformation. The
- * user can also extend the {@link RichFoldFunction} to gain access to
- * other features provided by the {@link org.apache.flink.api.common.functions.RichFunction}
- * interface
- *
+ * contains all the intermediate values of the fold transformation. The user
+ * can also extend the {@link RichFoldFunction} to gain access to other
+ * features provided by the
+ * {@link org.apache.flink.api.common.functions.RichFunction} interface
+ *
* @param folder
- * The {@link FoldFunction} that will be called for every element
- * of the input values.
+ * The {@link FoldFunction} that will be called for every element
+ * of the input values.
* @return The transformed DataStream
*/
public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<OUT, R> folder) {
- TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType());
+ TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
+ Utils.getCallLocationName(), false);
- return transform("Fold", outType, new StreamFoldInvokable<OUT, R>(clean(folder), initialValue, outType));
+ return transform("Fold", outType, new StreamFoldInvokable<OUT, R>(clean(folder),
+ initialValue, outType));
}
/**
@@ -1111,15 +1151,19 @@ public class DataStream<OUT> {
}
/**
- * Writes the DataStream to a socket as a byte array. The format of the output is
- * specified by a {@link SerializationSchema}.
- *
- * @param hostName host of the socket
- * @param port port of the socket
- * @param schema schema for serialization
+ * Writes the DataStream to a socket as a byte array. The format of the
+ * output is specified by a {@link SerializationSchema}.
+ *
+ * @param hostName
+ * host of the socket
+ * @param port
+ * port of the socket
+ * @param schema
+ * schema for serialization
* @return the closed DataStream
*/
- public DataStreamSink<OUT> writeToSocket(String hostName, int port, SerializationSchema<OUT, byte[]> schema){
+ public DataStreamSink<OUT> writeToSocket(String hostName, int port,
+ SerializationSchema<OUT, byte[]> schema) {
DataStreamSink<OUT> returnStream = addSink(new SocketClientSink<OUT>(hostName, port, schema));
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf49e90/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index b565931..034425d 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
@@ -70,21 +71,21 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
*/
@Override
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
- return transform("Grouped Reduce", getType(), new GroupedReduceInvokable<OUT>(clean(reducer),
- keySelector));
+ return transform("Grouped Reduce", getType(), new GroupedReduceInvokable<OUT>(
+ clean(reducer), keySelector));
}
/**
* Applies a fold transformation on the grouped data stream grouped on by
* the given key position. The {@link FoldFunction} will receive input
* values based on the key value. Only input values with the same key will
- * go to the same folder.The user can also extend
- * {@link RichFoldFunction} to gain access to other features provided by
- * the {@link RichFuntion} interface.
- *
+ * go to the same folder.The user can also extend {@link RichFoldFunction}
+ * to gain access to other features provided by the {@link RichFuntion}
+ * interface.
+ *
* @param folder
- * The {@link FoldFunction} that will be called for every
- * element of the input values with the same key.
+ * The {@link FoldFunction} that will be called for every element
+ * of the input values with the same key.
* @param initialValue
* The initialValue passed to the folders for each key.
* @return The transformed DataStream.
@@ -93,10 +94,11 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
@Override
public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<OUT, R> folder) {
- TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType());
+ TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
+ Utils.getCallLocationName(), false);
- return transform("Grouped Fold", outType, new GroupedFoldInvokable<OUT, R>(clean(folder), keySelector,
- initialValue, outType));
+ return transform("Grouped Fold", outType, new GroupedFoldInvokable<OUT, R>(clean(folder),
+ keySelector, initialValue, outType));
}
/**
@@ -215,8 +217,8 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(clean(aggregate),
keySelector);
- SingleOutputStreamOperator<OUT, ?> returnStream = transform("Grouped Aggregation", getType(),
- invokable);
+ SingleOutputStreamOperator<OUT, ?> returnStream = transform("Grouped Aggregation",
+ getType(), invokable);
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf49e90/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 0a3b29f..7b6a367 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -194,8 +194,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
if (typeInfo == null) {
throw new IllegalArgumentException("Type information must not be null.");
}
- streamGraph.setOutType(id, typeInfo);
- this.typeInfo = typeInfo;
+ fillInType(typeInfo);
@SuppressWarnings("unchecked")
O returnType = (O) this;
return returnType;
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf49e90/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
new file mode 100644
index 0000000..9a19dde
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+import org.apache.flink.streaming.api.function.co.CoWindowFunction;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class TypeFillTest {
+
+ @Test
+ public void test() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Long> source = env.generateSequence(1, 10);
+
+ try {
+ source.map(new TestMap<Long, Long>()).print();
+ fail();
+ } catch (Exception e) {
+ }
+ try {
+ source.flatMap(new TestFlatMap<Long, Long>()).print();
+ fail();
+ } catch (Exception e) {
+ }
+ try {
+ source.connect(source).map(new TestCoMap<Long, Long, Integer>()).print();
+ fail();
+ } catch (Exception e) {
+ }
+ try {
+ source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>()).print();
+ fail();
+ } catch (Exception e) {
+ }
+ try {
+ source.connect(source).reduce(new TestCoReduce<Long, Long, Integer>()).print();
+ fail();
+ } catch (Exception e) {
+ }
+ try {
+ source.connect(source).windowReduce(new TestCoWindow<Long, Long, String>(), 10, 100)
+ .print();
+ fail();
+ } catch (Exception e) {
+ }
+
+ source.map(new TestMap<Long, Long>()).returns(Long.class).print();
+ source.flatMap(new TestFlatMap<Long, Long>()).returns("Long").print();
+ source.connect(source).map(new TestCoMap<Long, Long, Integer>()).returns("Integer").print();
+ source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>())
+ .returns(BasicTypeInfo.INT_TYPE_INFO).print();
+ source.connect(source).reduce(new TestCoReduce<Long, Long, Integer>())
+ .returns(Integer.class).print();
+ source.connect(source).windowReduce(new TestCoWindow<Long, Long, String>(), 10, 100)
+ .returns("String").print();
+
+ assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
+ source.map(new TestMap<Long, Long>()).returns(Long.class).getType());
+
+ SingleOutputStreamOperator<String, ?> map = source.map(new MapFunction<Long, String>() {
+
+ @Override
+ public String map(Long value) throws Exception {
+ return null;
+ }
+ });
+
+ map.print();
+ try {
+ map.returns("String");
+ fail();
+ } catch (Exception e) {
+ }
+
+ }
+
+ private class TestMap<T, O> implements MapFunction<T, O> {
+ @Override
+ public O map(T value) throws Exception {
+ return null;
+ }
+ }
+
+ private class TestFlatMap<T, O> implements FlatMapFunction<T, O> {
+ @Override
+ public void flatMap(T value, Collector<O> out) throws Exception {
+ }
+ }
+
+ private class TestCoMap<IN1, IN2, OUT> implements CoMapFunction<IN1, IN2, OUT> {
+
+ @Override
+ public OUT map1(IN1 value) {
+ return null;
+ }
+
+ @Override
+ public OUT map2(IN2 value) {
+ return null;
+ }
+
+ }
+
+ private class TestCoFlatMap<IN1, IN2, OUT> implements CoFlatMapFunction<IN1, IN2, OUT> {
+
+ @Override
+ public void flatMap1(IN1 value, Collector<OUT> out) throws Exception {
+ }
+
+ @Override
+ public void flatMap2(IN2 value, Collector<OUT> out) throws Exception {
+ }
+
+ }
+
+ private class TestCoReduce<IN1, IN2, OUT> implements CoReduceFunction<IN1, IN2, OUT> {
+
+ @Override
+ public IN1 reduce1(IN1 value1, IN1 value2) {
+ return null;
+ }
+
+ @Override
+ public IN2 reduce2(IN2 value1, IN2 value2) {
+ return null;
+ }
+
+ @Override
+ public OUT map1(IN1 value) {
+ return null;
+ }
+
+ @Override
+ public OUT map2(IN2 value) {
+ return null;
+ }
+
+ }
+
+ private class TestCoWindow<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
+
+ @Override
+ public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out)
+ throws Exception {
+ }
+
+ }
+
+}