You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:44 UTC

[03/19] flink git commit: [streaming] Major internal renaming and restructure

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
index 88673a3..1731e7c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
@@ -21,7 +21,7 @@ import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.mock;
 
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamvertex.MockRecordWriter;
+import org.apache.flink.streaming.api.streamtask.MockRecordWriter;
 import org.mockito.Mockito;
 
 public class MockRecordWriterFactory {

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
index 4cf02ae..2f6e450 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.util;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 public class MockSource<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
index 87f290f..423d08e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
@@ -22,7 +22,7 @@ import java.util.List;
 import java.util.TreeSet;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 
 public class TestListResultSink<T> extends RichSinkFunction<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 0ff5c56..5cf529a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -18,11 +18,9 @@
 
 package org.apache.flink.streaming.util;
 
-import akka.actor.ActorRef;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index f5f2cd7..86c739c 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index bfc59ec..cf9edfe 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -24,8 +24,8 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.RichSourceFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 68b105a..bfc9e1a 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -19,9 +19,9 @@ package org.apache.flink.streaming.examples.ml;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.WindowMapFunction;
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.WindowMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.util.Collector;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index dc5ce42..1522910 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.examples.windowing;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
 import org.apache.flink.util.Collector;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
index ec99026..4fba898 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
@@ -31,8 +31,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.WindowedDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.WindowMapFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.WindowMapFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.Time;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
index bf3802b..c78ec34 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.Time;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java b/flink-staging/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java
index 77d102d..144e3ef 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java
@@ -22,9 +22,9 @@ import java.io.Serializable;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.function.aggregation.SumFunction;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumFunction;
 
 import scala.Product;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
index 9363236..fbd7a02 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
@@ -19,17 +19,17 @@
 package org.apache.flink.streaming.api.scala
 
 import java.util
-
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream}
-import org.apache.flink.streaming.api.function.co.{CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction}
-import org.apache.flink.streaming.api.invokable.operator.co.{CoFlatMapInvokable, CoMapInvokable, CoReduceInvokable}
+import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.util.Collector
-
 import scala.collection.JavaConversions.asScalaBuffer
 import scala.reflect.ClassTag
+import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap
+import org.apache.flink.streaming.api.operators.co.CoStreamMap
+import org.apache.flink.streaming.api.operators.co.CoStreamReduce
 
 class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
 
@@ -55,7 +55,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
     }
 
     new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]],
-      new CoMapInvokable[IN1, IN2, R](comapper)))
+      new CoStreamMap[IN1, IN2, R](comapper)))
   }
 
   /**
@@ -79,7 +79,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
     }
 
     new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]],
-      new CoMapInvokable[IN1, IN2, R](coMapper)))
+      new CoStreamMap[IN1, IN2, R](coMapper)))
   }
 
   /**
@@ -103,7 +103,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
     new DataStream[R](javaStream.addCoFunction("flatMap", implicitly[TypeInformation[R]],
-      new CoFlatMapInvokable[IN1, IN2, R](coFlatMapper)))
+      new CoStreamFlatMap[IN1, IN2, R](coFlatMapper)))
   }
 
   /**
@@ -269,7 +269,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
     }
 
     new DataStream[R](javaStream.addCoFunction("coReduce", implicitly[TypeInformation[R]],
-      new CoReduceInvokable[IN1, IN2, R](coReducer)))
+      new CoStreamReduce[IN1, IN2, R](coReducer)))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index c869a18..5a4b611 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -23,31 +23,40 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
   SingleOutputStreamOperator, GroupedDataStream}
 import org.apache.flink.streaming.util.serialization.SerializationSchema
+
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.streaming.api.invokable.operator._
 import org.apache.flink.util.Collector
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.streaming.api.invokable.StreamInvokable
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.common.functions.FoldFunction
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.common.functions.FilterFunction
-import org.apache.flink.streaming.api.function.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
 import org.apache.flink.streaming.api.windowing.policy.{ EvictionPolicy, TriggerPolicy }
+
 import scala.collection.JavaConversions._
+
 import java.util.HashMap
-import org.apache.flink.streaming.api.function.aggregation.SumFunction
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
+
+import org.apache.flink.streaming.api.functions.aggregation.SumFunction
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
-import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy
+import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamReduce
+import org.apache.flink.streaming.api.operators.StreamGroupedReduce
+import org.apache.flink.streaming.api.operators.StreamFlatMap
+import org.apache.flink.streaming.api.operators.StreamGroupedFold
+import org.apache.flink.streaming.api.operators.StreamMap
+import org.apache.flink.streaming.api.operators.StreamFold
 
 class DataStream[T](javaStream: JavaStream[T]) {
 
@@ -329,9 +338,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
     }
 
     val invokable = jStream match {
-      case groupedStream: GroupedDataStream[_] => new GroupedReduceInvokable(reducer,
+      case groupedStream: GroupedDataStream[_] => new StreamGroupedReduce(reducer,
         groupedStream.getKeySelector())
-      case _ => new StreamReduceInvokable(reducer)
+      case _ => new StreamReduce(reducer)
     }
     new DataStream[Product](jStream.transform("aggregation", jStream.getType(),
       invokable)).asInstanceOf[DataStream[T]]
@@ -357,7 +366,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
       def map(in: T): R = cleanFun(in)
     }
 
-    javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper))
+    javaStream.transform("map", implicitly[TypeInformation[R]], new StreamMap[T, R](mapper))
   }
 
   /**
@@ -368,7 +377,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
       throw new NullPointerException("Map function must not be null.")
     }
 
-    javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper))
+    javaStream.transform("map", implicitly[TypeInformation[R]], new StreamMap[T, R](mapper))
   }
 
   /**
@@ -380,7 +389,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
    javaStream.transform("flatMap", implicitly[TypeInformation[R]], 
-       new FlatMapInvokable[T, R](flatMapper))
+       new StreamFlatMap[T, R](flatMapper))
   }
 
   /**
@@ -423,9 +432,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
     }
     javaStream match {
       case ds: GroupedDataStream[_] => javaStream.transform("reduce",
-        javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector()))
+        javaStream.getType(), new StreamGroupedReduce[T](reducer, ds.getKeySelector()))
       case _ => javaStream.transform("reduce", javaStream.getType(),
-        new StreamReduceInvokable[T](reducer))
+        new StreamReduce[T](reducer))
     }
   }
 
@@ -455,10 +464,10 @@ class DataStream[T](javaStream: JavaStream[T]) {
     }
     javaStream match {
       case ds: GroupedDataStream[_] => javaStream.transform("fold",
-        implicitly[TypeInformation[R]], new GroupedFoldInvokable[T,R](folder, ds.getKeySelector(), 
+        implicitly[TypeInformation[R]], new StreamGroupedFold[T,R](folder, ds.getKeySelector(), 
             initialValue, implicitly[TypeInformation[R]]))
       case _ => javaStream.transform("fold", implicitly[TypeInformation[R]],
-        new StreamFoldInvokable[T,R](folder, initialValue, implicitly[TypeInformation[R]]))
+        new StreamFold[T,R](folder, initialValue, implicitly[TypeInformation[R]]))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index a7b3480..d14787c 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -27,12 +27,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.scala.typeutils.CaseClassSerializer
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
-import org.apache.flink.streaming.api.function.co.CrossWindowFunction
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
+import org.apache.flink.streaming.api.functions.co.CrossWindowFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
+import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
 import java.util.concurrent.TimeUnit
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
+import org.apache.flink.streaming.api.operators.co.CoStreamWindow
 
 class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
   TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
@@ -83,12 +83,12 @@ object StreamCrossOperator {
      */
     def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
 
-      val invokable = new CoWindowInvokable[I1, I2, R](
+      val operator = new CoStreamWindow[I1, I2, R](
         clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
         op.timeStamp2)
 
-      javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(),
-        invokable)
+      javaStream.getExecutionEnvironment().getStreamGraph().setOperator(javaStream.getId(),
+        operator)
         
       val js = javaStream.asInstanceOf[SingleOutputStreamOperator[R,_]]
       js.returns(implicitly[TypeInformation[R]]).asInstanceOf[SingleOutputStreamOperator[R,_]]
@@ -100,8 +100,8 @@ object StreamCrossOperator {
 
     override def every(length: Long): CrossWindow[I1, I2] = {
       val graph = javaStream.getExecutionEnvironment().getStreamGraph()
-      val invokable = graph.getVertex(javaStream.getId()).getInvokable()
-      invokable.asInstanceOf[CoWindowInvokable[_,_,_]].setSlideSize(length)
+      val operator = graph.getVertex(javaStream.getId()).getOperator()
+      operator.asInstanceOf[CoStreamWindow[_,_,_]].setSlideSize(length)
       this
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index cfa7c18..0217793 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -25,10 +25,10 @@ import scala.reflect.ClassTag
 import org.apache.commons.lang.Validate
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
-import org.apache.flink.streaming.api.function.source.{ FromElementsFunction, SourceFunction }
+import org.apache.flink.streaming.api.functions.source.{ FromElementsFunction, SourceFunction }
 import org.apache.flink.util.Collector
 import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.streaming.api.function.source.FileMonitoringFunction.WatchType
+import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
 
 class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index bfdf493..510aaae 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -30,13 +30,13 @@ import org.apache.flink.api.java.operators.Keys
 import org.apache.flink.api.scala.typeutils.CaseClassSerializer
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.function.co.JoinWindowFunction
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
+import org.apache.flink.streaming.api.functions.co.JoinWindowFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.util.keys.KeySelectorUtil
-import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
+import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
 import java.util.concurrent.TimeUnit
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
+import org.apache.flink.streaming.api.operators.co.CoStreamWindow
 
 class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends 
 TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
@@ -195,12 +195,12 @@ object StreamJoinOperator {
      */
     def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
 
-      val invokable = new CoWindowInvokable[I1, I2, R](
+      val operator = new CoStreamWindow[I1, I2, R](
         clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
         op.timeStamp2)
 
-      javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(),
-        invokable)
+      javaStream.getExecutionEnvironment().getStreamGraph().setOperator(javaStream.getId(),
+        operator)
 
       val js = javaStream.asInstanceOf[SingleOutputStreamOperator[R,_]]
       js.returns(implicitly[TypeInformation[R]]).asInstanceOf[SingleOutputStreamOperator[R,_]]

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
index fd3a4a9..8e6dc36 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
@@ -18,9 +18,9 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.streaming.api.datastream.temporaloperator.{ TemporalOperator => JTempOp }
+import org.apache.flink.streaming.api.datastream.temporal.{ TemporalOperator => JTempOp }
 import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
+import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
 import org.apache.flink.streaming.api.windowing.helper.Timestamp
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment._
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
index 3bd5b24..09025e7 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -28,9 +28,9 @@ import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
 import org.apache.flink.streaming.api.datastream.{WindowedDataStream => JavaWStream}
-import org.apache.flink.streaming.api.function.WindowMapFunction
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.streaming.api.function.aggregation.SumFunction
+import org.apache.flink.streaming.api.functions.WindowMapFunction
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
+import org.apache.flink.streaming.api.functions.aggregation.SumFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.windowing.StreamWindow
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
index f25893a..b1bc8fb 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.api.java.table
 
 import java.lang.reflect.Modifier
-
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
@@ -29,7 +28,7 @@ import org.apache.flink.api.table.expressions._
 import org.apache.flink.api.table.typeinfo.RowTypeInfo
 import org.apache.flink.api.table.{ExpressionException, Row, Table}
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.invokable.operator.MapInvokable
+import org.apache.flink.streaming.api.operators.StreamMap
 
 /**
  * [[PlanTranslator]] for creating [[Table]]s from Java [[DataStream]]s and
@@ -109,7 +108,7 @@ class JavaStreamingTranslator extends PlanTranslator {
 
     val opName = s"select(${outputFields.mkString(",")})"
 
-    resultSet.transform(opName, outputType, new MapInvokable[Row, A](function))
+    resultSet.transform(opName, outputType, new StreamMap[Row, A](function))
   }
 
   private def translateInternal(op: PlanNode): DataStream[Row] = {
@@ -220,7 +219,7 @@ class JavaStreamingTranslator extends PlanTranslator {
 
     val opName = s"select(${fields.mkString(",")})"
 
-    input.transform(opName, resultType, new MapInvokable[I, Row](function))
+    input.transform(opName, resultType, new StreamMap[I, Row](function))
   }
 
   private def createJoin[L, R](

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 10773f5..fe2fdb8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -25,8 +25,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 import org.junit.AfterClass;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index f02a5d6..04f9f34 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -23,7 +23,7 @@ import java.util.StringTokenizer;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index ea311c4..aeaad5c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -25,9 +25,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.apache.flink.util.Collector;
 
 import java.io.BufferedReader;