You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2015/11/12 21:09:20 UTC

[5/8] flink git commit: [FLINK-2901] Move PythonAPI to flink-libraries

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
new file mode 100644
index 0000000..2116d1f
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
@@ -0,0 +1,264 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.functions.MapFunction import MapFunction
+from flink.functions.FlatMapFunction import FlatMapFunction
+from flink.functions.FilterFunction import FilterFunction
+from flink.functions.MapPartitionFunction import MapPartitionFunction
+from flink.functions.ReduceFunction import ReduceFunction
+from flink.functions.CrossFunction import CrossFunction
+from flink.functions.JoinFunction import JoinFunction
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+from flink.functions.CoGroupFunction import CoGroupFunction
+from flink.plan.Constants import INT, STRING, FLOAT, BOOL, Order
+
+
+class Mapper(MapFunction):
+    def map(self, value):
+        return value * value
+
+
+class Filter(FilterFunction):
+    def __init__(self, limit):
+        super(Filter, self).__init__()
+        self.limit = limit
+
+    def filter(self, value):
+        return value > self.limit
+
+
+class FlatMap(FlatMapFunction):
+    def flat_map(self, value, collector):
+        collector.collect(value)
+        collector.collect(value * 2)
+
+
+class MapPartition(MapPartitionFunction):
+    def map_partition(self, iterator, collector):
+        for value in iterator:
+            collector.collect(value * 2)
+
+
+class Reduce(ReduceFunction):
+    def reduce(self, value1, value2):
+        return value1 + value2
+
+
+class Reduce2(ReduceFunction):
+    def reduce(self, value1, value2):
+        return (value1[0] + value2[0], value1[1] + value2[1], value1[2], value1[3] or value2[3])
+
+
+class Cross(CrossFunction):
+    def cross(self, value1, value2):
+        return (value1, value2[3])
+
+
+class MapperBcv(MapFunction):
+    def map(self, value):
+        factor = self.context.get_broadcast_variable("test")[0][0]
+        return value * factor
+
+
+class Join(JoinFunction):
+    def join(self, value1, value2):
+        if value1[3]:
+            return value2[0] + str(value1[0])
+        else:
+            return value2[0] + str(value1[1])
+
+
+class GroupReduce(GroupReduceFunction):
+    def reduce(self, iterator, collector):
+        if iterator.has_next():
+            i, f, s, b = iterator.next()
+            for value in iterator:
+                i += value[0]
+                f += value[1]
+                b |= value[3]
+            collector.collect((i, f, s, b))
+
+
+class GroupReduce2(GroupReduceFunction):
+    def reduce(self, iterator, collector):
+        for value in iterator:
+            collector.collect(value)
+
+
+class GroupReduce3(GroupReduceFunction):
+    def reduce(self, iterator, collector):
+        collector.collect(iterator.next())
+
+    def combine(self, iterator, collector):
+        if iterator.has_next():
+            v1 = iterator.next()
+        if iterator.has_next():
+            v2 = iterator.next()
+        if v1[0] < v2[0]:
+            collector.collect(v1)
+        else:
+            collector.collect(v2)
+
+
+class CoGroup(CoGroupFunction):
+    def co_group(self, iterator1, iterator2, collector):
+        while iterator1.has_next() and iterator2.has_next():
+            collector.collect((iterator1.next(), iterator2.next()))
+
+
+class Id(MapFunction):
+    def map(self, value):
+        return value
+
+
+class Verify(MapPartitionFunction):
+    def __init__(self, expected, name):
+        super(Verify, self).__init__()
+        self.expected = expected
+        self.name = name
+
+    def map_partition(self, iterator, collector):
+        index = 0
+        for value in iterator:
+            if value != self.expected[index]:
+                print(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value))
+                raise Exception(self.name + " failed!")
+            index += 1
+        collector.collect(self.name + " successful!")
+
+
+class Verify2(MapPartitionFunction):
+    def __init__(self, expected, name):
+        super(Verify2, self).__init__()
+        self.expected = expected
+        self.name = name
+
+    def map_partition(self, iterator, collector):
+        for value in iterator:
+            if value in self.expected:
+                try:
+                    self.expected.remove(value)
+                except Exception:
+                    raise Exception(self.name + " failed!")
+        collector.collect(self.name + " successful!")
+
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.from_elements(1, 6, 12)
+
+    d2 = env.from_elements((1, 0.5, "hello", True), (2, 0.4, "world", False))
+
+    d3 = env.from_elements(("hello",), ("world",))
+
+    d4 = env.from_elements((1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False))
+
+    d5 = env.from_elements((4.4, 4.3, 1), (4.3, 4.4, 1), (4.2, 4.1, 3), (4.1, 4.1, 3))
+
+    d1 \
+        .map((lambda x: x * x), INT).map(Mapper(), INT) \
+        .map_partition(Verify([1, 1296, 20736], "Map"), STRING).output()
+
+    d1 \
+        .map(Mapper(), INT).map((lambda x: x * x), INT) \
+        .map_partition(Verify([1, 1296, 20736], "Chained Lambda"), STRING).output()
+
+    d1 \
+        .filter(Filter(5)).filter(Filter(8)) \
+        .map_partition(Verify([12], "Filter"), STRING).output()
+
+    d1 \
+        .flat_map(FlatMap(), INT).flat_map(FlatMap(), INT) \
+        .map_partition(Verify([1, 2, 2, 4, 6, 12, 12, 24, 12, 24, 24, 48], "FlatMap"), STRING).output()
+
+    d1 \
+        .map_partition(MapPartition(), INT) \
+        .map_partition(Verify([2, 12, 24], "MapPartition"), STRING).output()
+
+    d1 \
+        .reduce(Reduce()) \
+        .map_partition(Verify([19], "AllReduce"), STRING).output()
+
+    d4 \
+        .group_by(2).reduce(Reduce2()) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "CombineReduce"), STRING).output()
+
+    d4 \
+        .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce(Reduce2()) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "ChainedReduce"), STRING).output()
+
+    d1 \
+        .map(MapperBcv(), INT).with_broadcast_set("test", d2) \
+        .map_partition(Verify([1, 6, 12], "Broadcast"), STRING).output()
+
+    d1 \
+        .cross(d2).using(Cross(), (INT, BOOL)) \
+        .map_partition(Verify([(1, True), (1, False), (6, True), (6, False), (12, True), (12, False)], "Cross"), STRING).output()
+
+    d1 \
+        .cross(d3) \
+        .map_partition(Verify([(1, ("hello",)), (1, ("world",)), (6, ("hello",)), (6, ("world",)), (12, ("hello",)), (12, ("world",))], "Default Cross"), STRING).output()
+
+    d2 \
+        .cross(d3).project_second(0).project_first(0, 1) \
+        .map_partition(Verify([("hello", 1, 0.5), ("world", 1, 0.5), ("hello", 2, 0.4), ("world", 2, 0.4)], "Project Cross"), STRING).output()
+
+    d2 \
+        .join(d3).where(2).equal_to(0).using(Join(), STRING) \
+        .map_partition(Verify(["hello1", "world0.4"], "Join"), STRING).output()
+
+    d2 \
+        .join(d3).where(2).equal_to(0).project_first(0, 3).project_second(0) \
+        .map_partition(Verify([(1, True, "hello"), (2, False, "world")], "Project Join"), STRING).output()
+
+    d2 \
+        .join(d3).where(2).equal_to(0) \
+        .map_partition(Verify([((1, 0.5, "hello", True), ("hello",)), ((2, 0.4, "world", False), ("world",))], "Default Join"), STRING).output()
+
+    d2 \
+        .project(0, 1, 2) \
+        .map_partition(Verify([(1, 0.5, "hello"), (2, 0.4, "world")], "Project"), STRING).output()
+
+    d2 \
+        .union(d4) \
+        .map_partition(Verify2([(1, 0.5, "hello", True), (2, 0.4, "world", False), (1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False)], "Union"), STRING).output()
+
+    d4 \
+        .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=False) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "AllGroupReduce"), STRING).output()
+
+    d4 \
+        .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=True) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "ChainedGroupReduce"), STRING).output()
+
+    d4 \
+        .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=True) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "CombineGroupReduce"), STRING).output()
+
+    d5 \
+        .group_by(2).sort_group(0, Order.DESCENDING).sort_group(1, Order.ASCENDING).reduce_group(GroupReduce3(), (FLOAT, FLOAT, INT), combinable=True) \
+        .map_partition(Verify([(4.3, 4.4, 1), (4.1, 4.1, 3)], "ChainedSortedGroupReduce"), STRING).output()
+
+    d4 \
+        .co_group(d5).where(0).equal_to(2).using(CoGroup(), ((INT, FLOAT, STRING, BOOL), (FLOAT, FLOAT, INT))) \
+        .map_partition(Verify([((1, 0.5, "hello", True), (4.4, 4.3, 1)), ((1, 0.4, "hello", False), (4.3, 4.4, 1))], "CoGroup"), STRING).output()
+
+    env.set_degree_of_parallelism(1)
+
+    env.execute(local=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py
new file mode 100644
index 0000000..1f90587
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py
@@ -0,0 +1,30 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import WriteMode
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.read_text("src/test/python/org/apache/flink/python/api/data_text")
+
+    d1.write_text("/tmp/flink/result", WriteMode.OVERWRITE)
+
+    env.set_degree_of_parallelism(1)
+
+    env.execute(local=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
new file mode 100644
index 0000000..a3b8d07
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
@@ -0,0 +1,63 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import _Fields
+from flink.plan.Constants import INT, STRING, BOOL, FLOAT
+import sys
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.from_elements(("hello", 4, 3.2, True))
+
+    d2 = env.from_elements("world")
+
+    direct_from_source = d1.filter(lambda x:True)
+
+    if direct_from_source._info[_Fields.TYPES] != ("hello", 4, 3.2, True):
+        sys.exit("Error deducting type directly from source.")
+
+    from_common_udf = d1.map(lambda x: x[3], BOOL).filter(lambda x:True)
+
+    if from_common_udf._info[_Fields.TYPES] != BOOL:
+        sys.exit("Error deducting type from common udf.")
+
+    through_projection = d1.project(3, 2).filter(lambda x:True)
+
+    if through_projection._info[_Fields.TYPES] != (True, 3.2):
+        sys.exit("Error deducting type through projection.")
+
+    through_default_op = d1.cross(d2).filter(lambda x:True)
+
+    if through_default_op._info[_Fields.TYPES] != (("hello", 4, 3.2, True), "world"):
+        sys.exit("Error deducting type through default J/C." +str(through_default_op._info[_Fields.TYPES]))
+
+    through_prj_op = d1.cross(d2).project_first(1, 0).project_second().project_first(3, 2).filter(lambda x:True)
+
+    if through_prj_op._info[_Fields.TYPES] != (4, "hello", "world", True, 3.2):
+        sys.exit("Error deducting type through projection J/C. "+str(through_prj_op._info[_Fields.TYPES]))
+
+
+    env = get_environment()
+
+    msg = env.from_elements("Type deduction test successful.")
+
+    msg.output()
+
+    env.execute()
+

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py
new file mode 100644
index 0000000..f5f3ee4
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py
@@ -0,0 +1,70 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.functions.MapFunction import MapFunction
+from flink.functions.MapPartitionFunction import MapPartitionFunction
+from flink.plan.Constants import BOOL, INT, FLOAT, STRING, BYTES
+
+
+class Verify(MapPartitionFunction):
+    def __init__(self, expected, name):
+        super(Verify, self).__init__()
+        self.expected = expected
+        self.name = name
+
+    def map_partition(self, iterator, collector):
+        index = 0
+        for value in iterator:
+            if value != self.expected[index]:
+                print(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value))
+                raise Exception(self.name + " failed!")
+            index += 1
+        collector.collect(self.name + " successful!")
+
+
+class Id(MapFunction):
+    def map(self, value):
+        return value
+
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.from_elements(bytearray(b"hello"), bytearray(b"world"))
+
+    d1.map(Id(), BYTES).map_partition(Verify([bytearray(b"hello"), bytearray(b"world")], "Byte"), STRING).output()
+
+    d2 = env.from_elements(1,2,3,4,5)
+
+    d2.map(Id(), INT).map_partition(Verify([1,2,3,4,5], "Int"), STRING).output()
+
+    d3 = env.from_elements(True, True, False)
+
+    d3.map(Id(), BOOL).map_partition(Verify([True, True, False], "Bool"), STRING).output()
+
+    d4 = env.from_elements(1.4, 1.7, 12312.23)
+
+    d4.map(Id(), FLOAT).map_partition(Verify([1.4, 1.7, 12312.23], "Float"), STRING).output()
+
+    d5 = env.from_elements("hello", "world")
+
+    d5.map(Id(), STRING).map_partition(Verify(["hello", "world"], "String"), STRING).output()
+
+    env.set_degree_of_parallelism(1)
+
+    env.execute(local=True)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/pom.xml b/flink-libraries/pom.xml
index 065127b..e1dc2d9 100644
--- a/flink-libraries/pom.xml
+++ b/flink-libraries/pom.xml
@@ -36,5 +36,6 @@ under the License.
 	<modules>
 		<module>flink-gelly</module>
 		<module>flink-gelly-scala</module>
+		<module>flink-python</module>
 	</modules>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml b/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml
deleted file mode 100644
index 7a8f8af..0000000
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml
+++ /dev/null
@@ -1,61 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-language-binding-parent</artifactId>
-        <version>1.0-SNAPSHOT</version>
-        <relativePath>..</relativePath>
-    </parent>
-	
-    <artifactId>flink-language-binding-generic</artifactId>
-    <name>flink-language-binding-generic</name>
-    <packaging>jar</packaging>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-optimizer</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-    </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java
deleted file mode 100644
index bdb0444..0000000
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.common;
-
-import java.io.IOException;
-import java.util.Arrays;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple;
-import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.languagebinding.api.java.common.PlanBinder.Operation;
-import static org.apache.flink.languagebinding.api.java.common.PlanBinder.normalizeKeys;
-import static org.apache.flink.languagebinding.api.java.common.PlanBinder.toIntArray;
-import org.apache.flink.languagebinding.api.java.common.streaming.Receiver;
-
-/**
- * Container for all generic information related to operations. This class contains the absolute minimum fields that are
- * required for all operations. This class should be extended to contain any additional fields required on a
- * per-language basis.
- */
-public class OperationInfo {
-	public int parentID; //DataSet that an operation is applied on
-	public int otherID; //secondary DataSet
-	public int setID; //ID for new DataSet
-	public String[] keys;
-	public String[] keys1; //join/cogroup keys
-	public String[] keys2; //join/cogroup keys
-	public TypeInformation<?> types; //typeinformation about output type
-	public AggregationEntry[] aggregates;
-	public ProjectionEntry[] projections; //projectFirst/projectSecond
-	public Object[] values;
-	public int count;
-	public int field;
-	public int[] fields;
-	public Order order;
-	public String path;
-	public String fieldDelimiter;
-	public String lineDelimiter;
-	public long from;
-	public long to;
-	public WriteMode writeMode;
-	public boolean toError;
-	public String name;
-
-	public OperationInfo() {
-	}
-
-	public OperationInfo(Receiver receiver, Operation identifier) throws IOException {
-		Object tmpType;
-		switch (identifier) {
-			case SOURCE_CSV:
-				setID = (Integer) receiver.getRecord(true);
-				path = (String) receiver.getRecord();
-				fieldDelimiter = (String) receiver.getRecord();
-				lineDelimiter = (String) receiver.getRecord();
-				tmpType = (Tuple) receiver.getRecord();
-				types = tmpType == null ? null : getForObject(tmpType);
-				return;
-			case SOURCE_TEXT:
-				setID = (Integer) receiver.getRecord(true);
-				path = (String) receiver.getRecord();
-				return;
-			case SOURCE_VALUE:
-				setID = (Integer) receiver.getRecord(true);
-				int valueCount = (Integer) receiver.getRecord(true);
-				values = new Object[valueCount];
-				for (int x = 0; x < valueCount; x++) {
-					values[x] = receiver.getRecord();
-				}
-				return;
-			case SOURCE_SEQ:
-				setID = (Integer) receiver.getRecord(true);
-				from = (Long) receiver.getRecord();
-				to = (Long) receiver.getRecord();
-				return;
-			case SINK_CSV:
-				parentID = (Integer) receiver.getRecord(true);
-				path = (String) receiver.getRecord();
-				fieldDelimiter = (String) receiver.getRecord();
-				lineDelimiter = (String) receiver.getRecord();
-				writeMode = ((Integer) receiver.getRecord(true)) == 1
-						? WriteMode.OVERWRITE
-						: WriteMode.NO_OVERWRITE;
-				return;
-			case SINK_TEXT:
-				parentID = (Integer) receiver.getRecord(true);
-				path = (String) receiver.getRecord();
-				writeMode = ((Integer) receiver.getRecord(true)) == 1
-						? WriteMode.OVERWRITE
-						: WriteMode.NO_OVERWRITE;
-				return;
-			case SINK_PRINT:
-				parentID = (Integer) receiver.getRecord(true);
-				toError = (Boolean) receiver.getRecord();
-				return;
-			case BROADCAST:
-				parentID = (Integer) receiver.getRecord(true);
-				otherID = (Integer) receiver.getRecord(true);
-				name = (String) receiver.getRecord();
-				return;
-		}
-		setID = (Integer) receiver.getRecord(true);
-		parentID = (Integer) receiver.getRecord(true);
-		switch (identifier) {
-			case AGGREGATE:
-				count = (Integer) receiver.getRecord(true);
-				aggregates = new AggregationEntry[count];
-				for (int x = 0; x < count; x++) {
-					int encodedAgg = (Integer) receiver.getRecord(true);
-					int field = (Integer) receiver.getRecord(true);
-					aggregates[x] = new AggregationEntry(encodedAgg, field);
-				}
-				return;
-			case FIRST:
-				count = (Integer) receiver.getRecord(true);
-				return;
-			case DISTINCT:
-			case GROUPBY:
-			case PARTITION_HASH:
-				keys = normalizeKeys(receiver.getRecord(true));
-				return;
-			case PROJECTION:
-				fields = toIntArray(receiver.getRecord(true));
-				return;
-			case REBALANCE:
-				return;
-			case SORT:
-				field = (Integer) receiver.getRecord(true);
-				int encodedOrder = (Integer) receiver.getRecord(true);
-				switch (encodedOrder) {
-					case 0:
-						order = Order.NONE;
-						break;
-					case 1:
-						order = Order.ASCENDING;
-						break;
-					case 2:
-						order = Order.DESCENDING;
-						break;
-					case 3:
-						order = Order.ANY;
-						break;
-					default:
-						order = Order.NONE;
-						break;
-				}
-				return;
-			case UNION:
-				otherID = (Integer) receiver.getRecord(true);
-				return;
-		}
-	}
-
-	@Override
-	public String toString() {
-		StringBuilder sb = new StringBuilder();
-		sb.append("SetID: ").append(setID).append("\n");
-		sb.append("ParentID: ").append(parentID).append("\n");
-		sb.append("OtherID: ").append(otherID).append("\n");
-		sb.append("Name: ").append(name).append("\n");
-		sb.append("Types: ").append(types).append("\n");
-		sb.append("Keys1: ").append(Arrays.toString(keys1)).append("\n");
-		sb.append("Keys2: ").append(Arrays.toString(keys2)).append("\n");
-		sb.append("Keys: ").append(Arrays.toString(keys)).append("\n");
-		sb.append("Aggregates: ").append(Arrays.toString(aggregates)).append("\n");
-		sb.append("Projections: ").append(Arrays.toString(projections)).append("\n");
-		sb.append("Count: ").append(count).append("\n");
-		sb.append("Field: ").append(field).append("\n");
-		sb.append("Order: ").append(order.toString()).append("\n");
-		sb.append("Path: ").append(path).append("\n");
-		sb.append("FieldDelimiter: ").append(fieldDelimiter).append("\n");
-		sb.append("LineDelimiter: ").append(lineDelimiter).append("\n");
-		sb.append("From: ").append(from).append("\n");
-		sb.append("To: ").append(to).append("\n");
-		sb.append("WriteMode: ").append(writeMode).append("\n");
-		sb.append("toError: ").append(toError).append("\n");
-		return sb.toString();
-	}
-
-	public static class AggregationEntry {
-		public Aggregations agg;
-		public int field;
-
-		public AggregationEntry(int encodedAgg, int field) {
-			switch (encodedAgg) {
-				case 0:
-					agg = Aggregations.MAX;
-					break;
-				case 1:
-					agg = Aggregations.MIN;
-					break;
-				case 2:
-					agg = Aggregations.SUM;
-					break;
-			}
-			this.field = field;
-		}
-
-		@Override
-		public String toString() {
-			return agg.toString() + " - " + field;
-		}
-	}
-
-	public static class ProjectionEntry {
-		public ProjectionSide side;
-		public int[] keys;
-
-		public ProjectionEntry(ProjectionSide side, int[] keys) {
-			this.side = side;
-			this.keys = keys;
-		}
-
-		@Override
-		public String toString() {
-			return side + " - " + Arrays.toString(keys);
-		}
-	}
-
-	public enum ProjectionSide {
-		FIRST,
-		SECOND
-	}
-
-	public enum DatasizeHint {
-		NONE,
-		TINY,
-		HUGE
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
deleted file mode 100644
index ca252f8..0000000
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
+++ /dev/null
@@ -1,582 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.common;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvInputFormat;
-import org.apache.flink.api.java.io.PrintingOutputFormat;
-import org.apache.flink.api.java.operators.AggregateOperator;
-import org.apache.flink.api.java.operators.CrossOperator.DefaultCross;
-import org.apache.flink.api.java.operators.CrossOperator.ProjectCross;
-import org.apache.flink.api.java.operators.Grouping;
-import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin;
-import org.apache.flink.api.java.operators.JoinOperator.ProjectJoin;
-import org.apache.flink.api.java.operators.SortedGrouping;
-import org.apache.flink.api.java.operators.UdfOperator;
-import org.apache.flink.api.java.operators.UnsortedGrouping;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint;
-import static org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.HUGE;
-import static org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.NONE;
-import static org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.TINY;
-import org.apache.flink.languagebinding.api.java.common.OperationInfo.ProjectionEntry;
-import org.apache.flink.languagebinding.api.java.common.streaming.Receiver;
-
-/**
- * Generic class to construct a Flink plan based on external data.
- *
- * @param <INFO>
- */
-public abstract class PlanBinder<INFO extends OperationInfo> {
-	public static final String PLANBINDER_CONFIG_BCVAR_COUNT = "PLANBINDER_BCVAR_COUNT";
-	public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_";
-
-	protected static String FLINK_HDFS_PATH = "hdfs:/tmp";
-	public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + "/flink_data";
-
-	public static boolean DEBUG = false;
-
-	protected HashMap<Integer, Object> sets = new HashMap();
-	public static ExecutionEnvironment env;
-	protected Receiver receiver;
-
-	public static final int MAPPED_FILE_SIZE = 1024 * 1024 * 64;
-
-	//====Plan==========================================================================================================
-	protected void receivePlan() throws IOException {
-		receiveParameters();
-		receiveOperations();
-	}
-
-	//====Environment===================================================================================================
-	/**
-	 * This enum contains the identifiers for all supported environment parameters.
-	 */
-	private enum Parameters {
-		DOP,
-		MODE,
-		RETRY,
-		DEBUG
-	}
-
-	private void receiveParameters() throws IOException {
-		Integer parameterCount = (Integer) receiver.getRecord(true);
-
-		for (int x = 0; x < parameterCount; x++) {
-			Tuple value = (Tuple) receiver.getRecord(true);
-			switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) {
-				case DOP:
-					Integer dop = (Integer) value.getField(1);
-					env.setParallelism(dop);
-					break;
-				case MODE:
-					FLINK_HDFS_PATH = (Boolean) value.getField(1) ? "file:/tmp/flink" : "hdfs:/tmp/flink";
-					break;
-				case RETRY:
-					int retry = (Integer) value.getField(1);
-					env.setNumberOfExecutionRetries(retry);
-					break;
-				case DEBUG:
-					DEBUG = (Boolean) value.getField(1);
-					break;
-			}
-		}
-		if (env.getParallelism() < 0) {
-			env.setParallelism(1);
-		}
-	}
-
-	//====Operations====================================================================================================
-	/**
-	 * This enum contains the identifiers for all supported non-UDF DataSet operations.
-	 */
-	protected enum Operation {
-		SOURCE_CSV, SOURCE_TEXT, SOURCE_VALUE, SOURCE_SEQ, SINK_CSV, SINK_TEXT, SINK_PRINT,
-		PROJECTION, SORT, UNION, FIRST, DISTINCT, GROUPBY, AGGREGATE,
-		REBALANCE, PARTITION_HASH,
-		BROADCAST
-	}
-
-	/**
-	 * This enum contains the identifiers for all supported UDF DataSet operations.
-	 */
-	protected enum AbstractOperation {
-		COGROUP, CROSS, CROSS_H, CROSS_T, FILTER, FLATMAP, GROUPREDUCE, JOIN, JOIN_H, JOIN_T, MAP, REDUCE, MAPPARTITION,
-	}
-
-	protected void receiveOperations() throws IOException {
-		Integer operationCount = (Integer) receiver.getRecord(true);
-		for (int x = 0; x < operationCount; x++) {
-			String identifier = (String) receiver.getRecord();
-			Operation op = null;
-			AbstractOperation aop = null;
-			try {
-				op = Operation.valueOf(identifier.toUpperCase());
-			} catch (IllegalArgumentException iae) {
-				try {
-					aop = AbstractOperation.valueOf(identifier.toUpperCase());
-				} catch (IllegalArgumentException iae2) {
-					throw new IllegalArgumentException("Invalid operation specified: " + identifier);
-				}
-			}
-			if (op != null) {
-				switch (op) {
-					case SOURCE_CSV:
-						createCsvSource(createOperationInfo(op));
-						break;
-					case SOURCE_TEXT:
-						createTextSource(createOperationInfo(op));
-						break;
-					case SOURCE_VALUE:
-						createValueSource(createOperationInfo(op));
-						break;
-					case SOURCE_SEQ:
-						createSequenceSource(createOperationInfo(op));
-						break;
-					case SINK_CSV:
-						createCsvSink(createOperationInfo(op));
-						break;
-					case SINK_TEXT:
-						createTextSink(createOperationInfo(op));
-						break;
-					case SINK_PRINT:
-						createPrintSink(createOperationInfo(op));
-						break;
-					case BROADCAST:
-						createBroadcastVariable(createOperationInfo(op));
-						break;
-					case AGGREGATE:
-						createAggregationOperation(createOperationInfo(op));
-						break;
-					case DISTINCT:
-						createDistinctOperation(createOperationInfo(op));
-						break;
-					case FIRST:
-						createFirstOperation(createOperationInfo(op));
-						break;
-					case PARTITION_HASH:
-						createHashPartitionOperation(createOperationInfo(op));
-						break;
-					case PROJECTION:
-						createProjectOperation(createOperationInfo(op));
-						break;
-					case REBALANCE:
-						createRebalanceOperation(createOperationInfo(op));
-						break;
-					case GROUPBY:
-						createGroupOperation(createOperationInfo(op));
-						break;
-					case SORT:
-						createSortOperation(createOperationInfo(op));
-						break;
-					case UNION:
-						createUnionOperation(createOperationInfo(op));
-						break;
-				}
-			}
-			if (aop != null) {
-				switch (aop) {
-					case COGROUP:
-						createCoGroupOperation(createOperationInfo(aop));
-						break;
-					case CROSS:
-						createCrossOperation(NONE, createOperationInfo(aop));
-						break;
-					case CROSS_H:
-						createCrossOperation(HUGE, createOperationInfo(aop));
-						break;
-					case CROSS_T:
-						createCrossOperation(TINY, createOperationInfo(aop));
-						break;
-					case FILTER:
-						createFilterOperation(createOperationInfo(aop));
-						break;
-					case FLATMAP:
-						createFlatMapOperation(createOperationInfo(aop));
-						break;
-					case GROUPREDUCE:
-						createGroupReduceOperation(createOperationInfo(aop));
-						break;
-					case JOIN:
-						createJoinOperation(NONE, createOperationInfo(aop));
-						break;
-					case JOIN_H:
-						createJoinOperation(HUGE, createOperationInfo(aop));
-						break;
-					case JOIN_T:
-						createJoinOperation(TINY, createOperationInfo(aop));
-						break;
-					case MAP:
-						createMapOperation(createOperationInfo(aop));
-						break;
-					case MAPPARTITION:
-						createMapPartitionOperation(createOperationInfo(aop));
-						break;
-					case REDUCE:
-						createReduceOperation(createOperationInfo(aop));
-						break;
-				}
-			}
-		}
-	}
-
-	/**
-	 * This method creates an OperationInfo object based on the operation-identifier passed.
-	 *
-	 * @param operationIdentifier
-	 * @return
-	 * @throws IOException
-	 */
-	protected OperationInfo createOperationInfo(Operation operationIdentifier) throws IOException {
-		return new OperationInfo(receiver, operationIdentifier);
-	}
-
-	/**
-	 * This method creates an OperationInfo object based on the operation-identifier passed.
-	 *
-	 * @param operationIdentifier
-	 * @return
-	 * @throws IOException
-	 */
-	protected abstract INFO createOperationInfo(AbstractOperation operationIdentifier) throws IOException;
-
-	private void createCsvSource(OperationInfo info) throws IOException {
-		if (!(info.types instanceof CompositeType)) {
-			throw new RuntimeException("The output type of a csv source has to be a tuple or a " +
-				"pojo type. The derived type is " + info);
-		}
-
-		sets.put(info.setID, env.createInput(new CsvInputFormat(new Path(info.path),
-			info.lineDelimiter, info.fieldDelimiter, (CompositeType)info.types), info.types)
-			.name("CsvSource"));
-	}
-
-	private void createTextSource(OperationInfo info) throws IOException {
-		sets.put(info.setID, env.readTextFile(info.path).name("TextSource"));
-	}
-
-	private void createValueSource(OperationInfo info) throws IOException {
-		sets.put(info.setID, env.fromElements(info.values).name("ValueSource"));
-	}
-
-	private void createSequenceSource(OperationInfo info) throws IOException {
-		sets.put(info.setID, env.generateSequence(info.from, info.to).name("SequenceSource"));
-	}
-
-	private void createCsvSink(OperationInfo info) throws IOException {
-		DataSet parent = (DataSet) sets.get(info.parentID);
-		parent.writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).name("CsvSink");
-	}
-
-	private void createTextSink(OperationInfo info) throws IOException {
-		DataSet parent = (DataSet) sets.get(info.parentID);
-		parent.writeAsText(info.path, info.writeMode).name("TextSink");
-	}
-
-	private void createPrintSink(OperationInfo info) throws IOException {
-		DataSet parent = (DataSet) sets.get(info.parentID);
-		parent.output(new PrintingOutputFormat(info.toError));
-	}
-
-	private void createBroadcastVariable(OperationInfo info) throws IOException {
-		UdfOperator op1 = (UdfOperator) sets.get(info.parentID);
-		DataSet op2 = (DataSet) sets.get(info.otherID);
-
-		op1.withBroadcastSet(op2, info.name);
-		Configuration c = ((UdfOperator) op1).getParameters();
-
-		if (c == null) {
-			c = new Configuration();
-		}
-
-		int count = c.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
-		c.setInteger(PLANBINDER_CONFIG_BCVAR_COUNT, count + 1);
-		c.setString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + count, info.name);
-
-		op1.withParameters(c);
-	}
-
-	private void createAggregationOperation(OperationInfo info) throws IOException {
-		DataSet op = (DataSet) sets.get(info.parentID);
-		AggregateOperator ao = op.aggregate(info.aggregates[0].agg, info.aggregates[0].field);
-
-		for (int x = 1; x < info.count; x++) {
-			ao = ao.and(info.aggregates[x].agg, info.aggregates[x].field);
-		}
-
-		sets.put(info.setID, ao.name("Aggregation"));
-	}
-
-	private void createDistinctOperation(OperationInfo info) throws IOException {
-		DataSet op = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, info.keys.length == 0 ? op.distinct() : op.distinct(info.keys).name("Distinct"));
-	}
-
-	private void createFirstOperation(OperationInfo info) throws IOException {
-		DataSet op = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, op.first(info.count).name("First"));
-	}
-
-	private void createGroupOperation(OperationInfo info) throws IOException {
-		DataSet op1 = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, op1.groupBy(info.keys));
-	}
-
-	private void createHashPartitionOperation(OperationInfo info) throws IOException {
-		DataSet op1 = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, op1.partitionByHash(info.keys));
-	}
-
-	private void createProjectOperation(OperationInfo info) throws IOException {
-		DataSet op1 = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, op1.project(info.fields).name("Projection"));
-	}
-
-	private void createRebalanceOperation(OperationInfo info) throws IOException {
-		DataSet op = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, op.rebalance().name("Rebalance"));
-	}
-
-	private void createSortOperation(OperationInfo info) throws IOException {
-		Grouping op1 = (Grouping) sets.get(info.parentID);
-		if (op1 instanceof UnsortedGrouping) {
-			sets.put(info.setID, ((UnsortedGrouping) op1).sortGroup(info.field, info.order));
-			return;
-		}
-		if (op1 instanceof SortedGrouping) {
-			sets.put(info.setID, ((SortedGrouping) op1).sortGroup(info.field, info.order));
-		}
-	}
-
-	private void createUnionOperation(OperationInfo info) throws IOException {
-		DataSet op1 = (DataSet) sets.get(info.parentID);
-		DataSet op2 = (DataSet) sets.get(info.otherID);
-		sets.put(info.setID, op1.union(op2).name("Union"));
-	}
-
-	private void createCoGroupOperation(INFO info) {
-		DataSet op1 = (DataSet) sets.get(info.parentID);
-		DataSet op2 = (DataSet) sets.get(info.otherID);
-		sets.put(info.setID, applyCoGroupOperation(op1, op2, info.keys1, info.keys2, info));
-	}
-
-	private void createCrossOperation(DatasizeHint mode, INFO info) {
-		DataSet op1 = (DataSet) sets.get(info.parentID);
-		DataSet op2 = (DataSet) sets.get(info.otherID);
-
-		if (info.types != null && (info.projections == null || info.projections.length == 0)) {
-			sets.put(info.setID, applyCrossOperation(op1, op2, mode, info));
-		} else {
-			DefaultCross defaultResult;
-			switch (mode) {
-				case NONE:
-					defaultResult = op1.cross(op2);
-					break;
-				case HUGE:
-					defaultResult = op1.crossWithHuge(op2);
-					break;
-				case TINY:
-					defaultResult = op1.crossWithTiny(op2);
-					break;
-				default:
-					throw new IllegalArgumentException("Invalid Cross mode specified: " + mode);
-			}
-			if (info.projections.length == 0) {
-				sets.put(info.setID, defaultResult.name("DefaultCross"));
-			} else {
-				ProjectCross project = null;
-				for (ProjectionEntry pe : info.projections) {
-					switch (pe.side) {
-						case FIRST:
-							project = project == null ? defaultResult.projectFirst(pe.keys) : project.projectFirst(pe.keys);
-							break;
-						case SECOND:
-							project = project == null ? defaultResult.projectSecond(pe.keys) : project.projectSecond(pe.keys);
-							break;
-					}
-				}
-				sets.put(info.setID, project.name("ProjectCross"));
-			}
-		}
-	}
-
-	private void createFilterOperation(INFO info) {
-		DataSet op1 = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, applyFilterOperation(op1, info));
-	}
-
-	private void createFlatMapOperation(INFO info) {
-		DataSet op1 = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, applyFlatMapOperation(op1, info));
-	}
-
-	private void createGroupReduceOperation(INFO info) {
-		Object op1 = sets.get(info.parentID);
-		if (op1 instanceof DataSet) {
-			sets.put(info.setID, applyGroupReduceOperation((DataSet) op1, info));
-			return;
-		}
-		if (op1 instanceof UnsortedGrouping) {
-			sets.put(info.setID, applyGroupReduceOperation((UnsortedGrouping) op1, info));
-			return;
-		}
-		if (op1 instanceof SortedGrouping) {
-			sets.put(info.setID, applyGroupReduceOperation((SortedGrouping) op1, info));
-		}
-	}
-
-	private void createJoinOperation(DatasizeHint mode, INFO info) {
-		DataSet op1 = (DataSet) sets.get(info.parentID);
-		DataSet op2 = (DataSet) sets.get(info.otherID);
-
-		if (info.types != null && (info.projections == null || info.projections.length == 0)) {
-			sets.put(info.setID, applyJoinOperation(op1, op2, info.keys1, info.keys2, mode, info));
-		} else {
-			DefaultJoin defaultResult = createDefaultJoin(op1, op2, info.keys1, info.keys2, mode);
-			if (info.projections.length == 0) {
-				sets.put(info.setID, defaultResult.name("DefaultJoin"));
-			} else {
-				ProjectJoin project = null;
-				for (ProjectionEntry pe : info.projections) {
-					switch (pe.side) {
-						case FIRST:
-							project = project == null ? defaultResult.projectFirst(pe.keys) : project.projectFirst(pe.keys);
-							break;
-						case SECOND:
-							project = project == null ? defaultResult.projectSecond(pe.keys) : project.projectSecond(pe.keys);
-							break;
-					}
-				}
-				sets.put(info.setID, project.name("ProjectJoin"));
-			}
-		}
-	}
-
-	protected DefaultJoin createDefaultJoin(DataSet op1, DataSet op2, String[] firstKeys, String[] secondKeys, DatasizeHint mode) {
-		switch (mode) {
-			case NONE:
-				return op1.join(op2).where(firstKeys).equalTo(secondKeys);
-			case HUGE:
-				return op1.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys);
-			case TINY:
-				return op1.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys);
-			default:
-				throw new IllegalArgumentException("Invalid join mode specified.");
-		}
-	}
-
-	private void createMapOperation(INFO info) {
-		DataSet op1 = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, applyMapOperation(op1, info));
-	}
-
-	private void createMapPartitionOperation(INFO info) {
-		DataSet op1 = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, applyMapPartitionOperation(op1, info));
-	}
-
-	private void createReduceOperation(INFO info) {
-		Object op1 = sets.get(info.parentID);
-		if (op1 instanceof DataSet) {
-			sets.put(info.setID, applyReduceOperation((DataSet) op1, info));
-			return;
-		}
-		if (op1 instanceof UnsortedGrouping) {
-			sets.put(info.setID, applyReduceOperation((UnsortedGrouping) op1, info));
-		}
-	}
-
-	protected abstract DataSet applyCoGroupOperation(DataSet op1, DataSet op2, String[] firstKeys, String[] secondKeys, INFO info);
-
-	protected abstract DataSet applyCrossOperation(DataSet op1, DataSet op2, DatasizeHint mode, INFO info);
-
-	protected abstract DataSet applyFilterOperation(DataSet op1, INFO info);
-
-	protected abstract DataSet applyFlatMapOperation(DataSet op1, INFO info);
-
-	protected abstract DataSet applyGroupReduceOperation(DataSet op1, INFO info);
-
-	protected abstract DataSet applyGroupReduceOperation(UnsortedGrouping op1, INFO info);
-
-	protected abstract DataSet applyGroupReduceOperation(SortedGrouping op1, INFO info);
-
-	protected abstract DataSet applyJoinOperation(DataSet op1, DataSet op2, String[] firstKeys, String[] secondKeys, DatasizeHint mode, INFO info);
-
-	protected abstract DataSet applyMapOperation(DataSet op1, INFO info);
-
-	protected abstract DataSet applyMapPartitionOperation(DataSet op1, INFO info);
-
-	protected abstract DataSet applyReduceOperation(DataSet op1, INFO info);
-
-	protected abstract DataSet applyReduceOperation(UnsortedGrouping op1, INFO info);
-
-	//====Utility=======================================================================================================
-	protected static String[] normalizeKeys(Object keys) {
-		if (keys instanceof Tuple) {
-			Tuple tupleKeys = (Tuple) keys;
-			if (tupleKeys.getArity() == 0) {
-				return new String[0];
-			}
-			if (tupleKeys.getField(0) instanceof Integer) {
-				String[] stringKeys = new String[tupleKeys.getArity()];
-				for (int x = 0; x < stringKeys.length; x++) {
-					stringKeys[x] = "f" + (Integer) tupleKeys.getField(x);
-				}
-				return stringKeys;
-			}
-			if (tupleKeys.getField(0) instanceof String) {
-				return tupleToStringArray(tupleKeys);
-			}
-			throw new RuntimeException("Key argument contains field that is neither an int nor a String.");
-		}
-		if (keys instanceof int[]) {
-			int[] intKeys = (int[]) keys;
-			String[] stringKeys = new String[intKeys.length];
-			for (int x = 0; x < stringKeys.length; x++) {
-				stringKeys[x] = "f" + intKeys[x];
-			}
-			return stringKeys;
-		}
-		throw new RuntimeException("Key argument is neither an int[] nor a Tuple.");
-	}
-
-	protected static int[] toIntArray(Object key) {
-		if (key instanceof Tuple) {
-			Tuple tuple = (Tuple) key;
-			int[] keys = new int[tuple.getArity()];
-			for (int y = 0; y < tuple.getArity(); y++) {
-				keys[y] = (Integer) tuple.getField(y);
-			}
-			return keys;
-		}
-		if (key instanceof int[]) {
-			return (int[]) key;
-		}
-		throw new RuntimeException("Key argument is neither an int[] nor a Tuple.");
-	}
-
-	protected static String[] tupleToStringArray(Tuple tuple) {
-		String[] keys = new String[tuple.getArity()];
-		for (int y = 0; y < tuple.getArity(); y++) {
-			keys[y] = (String) tuple.getField(y);
-		}
-		return keys;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
deleted file mode 100644
index 59ed20c..0000000
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.common.streaming;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.io.Serializable;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-//CHECKSTYLE.OFF: AvoidStarImport - tuple imports
-import org.apache.flink.api.java.tuple.*;
-import static org.apache.flink.languagebinding.api.java.common.streaming.Sender.*;
-//CHECKSTYLE.ON: AvoidStarImport
-import static org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_TMP_DATA_DIR;
-import static org.apache.flink.languagebinding.api.java.common.PlanBinder.MAPPED_FILE_SIZE;
-import org.apache.flink.util.Collector;
-
-/**
- * General-purpose class to read data from memory-mapped files.
- */
-public class Receiver implements Serializable {
-	private static final long serialVersionUID = -2474088929850009968L;
-
-	private final AbstractRichFunction function;
-
-	private File inputFile;
-	private RandomAccessFile inputRAF;
-	private FileChannel inputChannel;
-	private MappedByteBuffer fileBuffer;
-
-	private Deserializer<?> deserializer = null;
-
-	public Receiver(AbstractRichFunction function) {
-		this.function = function;
-	}
-
-	//=====Setup========================================================================================================
-	public void open(String path) throws IOException {
-		setupMappedFile(path);
-	}
-
-	private void setupMappedFile(String inputFilePath) throws FileNotFoundException, IOException {
-		File x = new File(FLINK_TMP_DATA_DIR);
-		x.mkdirs();
-
-		inputFile = new File(inputFilePath);
-		if (inputFile.exists()) {
-			inputFile.delete();
-		}
-		inputFile.createNewFile();
-		inputRAF = new RandomAccessFile(inputFilePath, "rw");
-		inputRAF.setLength(MAPPED_FILE_SIZE);
-		inputRAF.seek(MAPPED_FILE_SIZE - 1);
-		inputRAF.writeByte(0);
-		inputRAF.seek(0);
-		inputChannel = inputRAF.getChannel();
-		fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE);
-	}
-
-	public void close() throws IOException {
-		closeMappedFile();
-	}
-
-	private void closeMappedFile() throws IOException {
-		inputChannel.close();
-		inputRAF.close();
-	}
-
-	//=====Record-API===================================================================================================
-	/**
-	 * Loads a buffer from the memory-mapped file. The records contained within the buffer can be accessed using
-	 * collectRecord(). These records do not necessarily have to be of the same type. This method requires external
-	 * synchronization.
-	 *
-	 * @throws IOException
-	 */
-	private void loadBuffer() throws IOException {
-		int count = 0;
-		while (fileBuffer.get(0) == 0 && count < 10) {
-			try {
-				Thread.sleep(1000);
-			} catch (InterruptedException ie) {
-			}
-			fileBuffer.load();
-			count++;
-		}
-		if (fileBuffer.get(0) == 0) {
-			throw new RuntimeException("External process not responding.");
-		}
-		fileBuffer.position(1);
-	}
-
-	/**
-	 * Returns a record from the buffer. Note: This method cannot be replaced with specific methods like readInt() or
-	 * similar. The PlanBinder requires a method that can return any kind of object.
-	 *
-	 * @return read record
-	 * @throws IOException
-	 */
-	public Object getRecord() throws IOException {
-		return getRecord(false);
-	}
-
-	/**
-	 * Returns a record from the buffer. Note: This method cannot be replaced with specific methods like readInt() or
-	 * similar. The PlanBinder requires a method that can return any kind of object.
-	 *
-	 * @param normalized flag indicating whether certain types should be normalized
-	 * @return read record
-	 * @throws IOException
-	 */
-	public Object getRecord(boolean normalized) throws IOException {
-		if (fileBuffer.position() == 0) {
-			loadBuffer();
-		}
-		return receiveField(normalized);
-	}
-
-	/**
-	 * Reads a single primitive value or tuple from the buffer.
-	 *
-	 * @return primitive value or tuple
-	 * @throws IOException
-	 */
-	private Object receiveField(boolean normalized) throws IOException {
-		byte type = fileBuffer.get();
-		switch (type) {
-			case TYPE_TUPLE:
-				int tupleSize = fileBuffer.get();
-				Tuple tuple = createTuple(tupleSize);
-				for (int x = 0; x < tupleSize; x++) {
-					tuple.setField(receiveField(normalized), x);
-				}
-				return tuple;
-			case TYPE_BOOLEAN:
-				return fileBuffer.get() == 1;
-			case TYPE_BYTE:
-				return fileBuffer.get();
-			case TYPE_SHORT:
-				if (normalized) {
-					return (int) fileBuffer.getShort();
-				} else {
-					return fileBuffer.getShort();
-				}
-			case TYPE_INTEGER:
-				return fileBuffer.getInt();
-			case TYPE_LONG:
-				if (normalized) {
-					return new Long(fileBuffer.getLong()).intValue();
-				} else {
-					return fileBuffer.getLong();
-				}
-			case TYPE_FLOAT:
-				if (normalized) {
-					return (double) fileBuffer.getFloat();
-				} else {
-					return fileBuffer.getFloat();
-				}
-			case TYPE_DOUBLE:
-				return fileBuffer.getDouble();
-			case TYPE_STRING:
-				int stringSize = fileBuffer.getInt();
-				byte[] buffer = new byte[stringSize];
-				fileBuffer.get(buffer);
-				return new String(buffer);
-			case TYPE_BYTES:
-				int bytessize = fileBuffer.getInt();
-				byte[] bytebuffer = new byte[bytessize];
-				fileBuffer.get(bytebuffer);
-				return bytebuffer;
-			case TYPE_NULL:
-				return null;
-			default:
-				throw new IllegalArgumentException("Unknown TypeID encountered: " + type);
-		}
-	}
-
-	//=====Buffered-API=================================================================================================
-	/**
-	 * Reads a buffer of the given size from the memory-mapped file, and collects all records contained. This method
-	 * assumes that all values in the buffer are of the same type. This method does NOT take care of synchronization.
-	 * The user must guarantee that the buffer was completely written before calling this method.
-	 *
-	 * @param c Collector to collect records
-	 * @param bufferSize size of the buffer
-	 * @throws IOException
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public void collectBuffer(Collector c, int bufferSize) throws IOException {
-		fileBuffer.position(0);
-
-		if (deserializer == null) {
-			byte type = fileBuffer.get();
-			deserializer = getDeserializer(type);
-		}
-		while (fileBuffer.position() < bufferSize) {
-			c.collect(deserializer.deserialize());
-		}
-	}
-
-	//=====Deserializer=================================================================================================
-	private Deserializer<?> getDeserializer(byte type) {
-		switch (type) {
-			case TYPE_TUPLE:
-				return new TupleDeserializer();
-			case TYPE_BOOLEAN:
-				return new BooleanDeserializer();
-			case TYPE_BYTE:
-				return new ByteDeserializer();
-			case TYPE_BYTES:
-				return new BytesDeserializer();
-			case TYPE_SHORT:
-				return new ShortDeserializer();
-			case TYPE_INTEGER:
-				return new IntDeserializer();
-			case TYPE_LONG:
-				return new LongDeserializer();
-			case TYPE_STRING:
-				return new StringDeserializer();
-			case TYPE_FLOAT:
-				return new FloatDeserializer();
-			case TYPE_DOUBLE:
-				return new DoubleDeserializer();
-			case TYPE_NULL:
-				return new NullDeserializer();
-			default:
-				throw new IllegalArgumentException("Unknown TypeID encountered: " + type);
-
-		}
-	}
-
-	private interface Deserializer<T> {
-		public T deserialize();
-
-	}
-
-	private class BooleanDeserializer implements Deserializer<Boolean> {
-		@Override
-		public Boolean deserialize() {
-			return fileBuffer.get() == 1;
-		}
-	}
-
-	private class ByteDeserializer implements Deserializer<Byte> {
-		@Override
-		public Byte deserialize() {
-			return fileBuffer.get();
-		}
-	}
-
-	private class ShortDeserializer implements Deserializer<Short> {
-		@Override
-		public Short deserialize() {
-			return fileBuffer.getShort();
-		}
-	}
-
-	private class IntDeserializer implements Deserializer<Integer> {
-		@Override
-		public Integer deserialize() {
-			return fileBuffer.getInt();
-		}
-	}
-
-	private class LongDeserializer implements Deserializer<Long> {
-		@Override
-		public Long deserialize() {
-			return fileBuffer.getLong();
-		}
-	}
-
-	private class FloatDeserializer implements Deserializer<Float> {
-		@Override
-		public Float deserialize() {
-			return fileBuffer.getFloat();
-		}
-	}
-
-	private class DoubleDeserializer implements Deserializer<Double> {
-		@Override
-		public Double deserialize() {
-			return fileBuffer.getDouble();
-		}
-	}
-
-	private class StringDeserializer implements Deserializer<String> {
-		private int size;
-
-		@Override
-		public String deserialize() {
-			size = fileBuffer.getInt();
-			byte[] buffer = new byte[size];
-			fileBuffer.get(buffer);
-			return new String(buffer);
-		}
-	}
-
-	private class NullDeserializer implements Deserializer<Object> {
-		@Override
-		public Object deserialize() {
-			return null;
-		}
-	}
-
-	private class BytesDeserializer implements Deserializer<byte[]> {
-		@Override
-		public byte[] deserialize() {
-			int length = fileBuffer.getInt();
-			byte[] result = new byte[length];
-			fileBuffer.get(result);
-			return result;
-		}
-
-	}
-
-	private class TupleDeserializer implements Deserializer<Tuple> {
-		Deserializer<?>[] deserializer = null;
-		Tuple reuse;
-
-		public TupleDeserializer() {
-			int size = fileBuffer.getInt();
-			reuse = createTuple(size);
-			deserializer = new Deserializer[size];
-			for (int x = 0; x < deserializer.length; x++) {
-				deserializer[x] = getDeserializer(fileBuffer.get());
-			}
-		}
-
-		@Override
-		public Tuple deserialize() {
-			for (int x = 0; x < deserializer.length; x++) {
-				reuse.setField(deserializer[x].deserialize(), x);
-			}
-			return reuse;
-		}
-	}
-
-	public static Tuple createTuple(int size) {
-		try {
-			return Tuple.getTupleClass(size).newInstance();
-		} catch (InstantiationException e) {
-			throw new RuntimeException(e);
-		} catch (IllegalAccessException e) {
-			throw new RuntimeException(e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
deleted file mode 100644
index 3e0c317..0000000
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.common.streaming;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import static org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_TMP_DATA_DIR;
-import static org.apache.flink.languagebinding.api.java.common.PlanBinder.MAPPED_FILE_SIZE;
-
-/**
- * General-purpose class to write data to memory-mapped files.
- */
-public class Sender implements Serializable {
-	public static final byte TYPE_TUPLE = (byte) 11;
-	public static final byte TYPE_BOOLEAN = (byte) 10;
-	public static final byte TYPE_BYTE = (byte) 9;
-	public static final byte TYPE_SHORT = (byte) 8;
-	public static final byte TYPE_INTEGER = (byte) 7;
-	public static final byte TYPE_LONG = (byte) 6;
-	public static final byte TYPE_DOUBLE = (byte) 4;
-	public static final byte TYPE_FLOAT = (byte) 5;
-	public static final byte TYPE_CHAR = (byte) 3;
-	public static final byte TYPE_STRING = (byte) 2;
-	public static final byte TYPE_BYTES = (byte) 1;
-	public static final byte TYPE_NULL = (byte) 0;
-
-	private final AbstractRichFunction function;
-
-	private File outputFile;
-	private RandomAccessFile outputRAF;
-	private FileChannel outputChannel;
-	private MappedByteBuffer fileBuffer;
-
-	private final ByteBuffer[] saved = new ByteBuffer[2];
-
-	private final Serializer[] serializer = new Serializer[2];
-
-	public Sender(AbstractRichFunction function) {
-		this.function = function;
-	}
-
-	//=====Setup========================================================================================================
-	public void open(String path) throws IOException {
-		setupMappedFile(path);
-	}
-
-	private void setupMappedFile(String outputFilePath) throws FileNotFoundException, IOException {
-		File x = new File(FLINK_TMP_DATA_DIR);
-		x.mkdirs();
-
-		outputFile = new File(outputFilePath);
-		if (outputFile.exists()) {
-			outputFile.delete();
-		}
-		outputFile.createNewFile();
-		outputRAF = new RandomAccessFile(outputFilePath, "rw");
-		outputRAF.setLength(MAPPED_FILE_SIZE);
-		outputRAF.seek(MAPPED_FILE_SIZE - 1);
-		outputRAF.writeByte(0);
-		outputRAF.seek(0);
-		outputChannel = outputRAF.getChannel();
-		fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE);
-	}
-
-	public void close() throws IOException {
-		closeMappedFile();
-	}
-
-	private void closeMappedFile() throws IOException {
-		outputChannel.close();
-		outputRAF.close();
-	}
-
-	/**
-	 * Resets this object to the post-configuration state.
-	 */
-	public void reset() {
-		serializer[0] = null;
-		serializer[1] = null;
-		fileBuffer.clear();
-	}
-
-	//=====Serialization================================================================================================
-	/**
-	 * Writes a single record to the memory-mapped file. This method does NOT take care of synchronization. The user
-	 * must guarantee that the file may be written to before calling this method. This method essentially reserves the
-	 * whole buffer for one record. As such it imposes some performance restrictions and should only be used when
-	 * absolutely necessary.
-	 *
-	 * @param value record to send
-	 * @return size of the written buffer
-	 * @throws IOException
-	 */
-	public int sendRecord(Object value) throws IOException {
-		fileBuffer.clear();
-		int group = 0;
-
-		serializer[group] = getSerializer(value);
-		ByteBuffer bb = serializer[group].serialize(value);
-		if (bb.remaining() > MAPPED_FILE_SIZE) {
-			throw new RuntimeException("Serialized object does not fit into a single buffer.");
-		}
-		fileBuffer.put(bb);
-
-		int size = fileBuffer.position();
-
-		reset();
-		return size;
-	}
-
-	public boolean hasRemaining(int group) {
-		return saved[group] != null;
-	}
-
-	/**
-	 * Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values
-	 * in the iterator are of the same type. This method does NOT take care of synchronization. The caller must
-	 * guarantee that the file may be written to before calling this method.
-	 *
-	 * @param i iterator containing records
-	 * @param group group to which the iterator belongs, most notably used by CoGroup-functions.
-	 * @return size of the written buffer
-	 * @throws IOException
-	 */
-	public int sendBuffer(Iterator i, int group) throws IOException {
-		fileBuffer.clear();
-
-		Object value;
-		ByteBuffer bb;
-		if (serializer[group] == null) {
-			value = i.next();
-			serializer[group] = getSerializer(value);
-			bb = serializer[group].serialize(value);
-			if (bb.remaining() > MAPPED_FILE_SIZE) {
-				throw new RuntimeException("Serialized object does not fit into a single buffer.");
-			}
-			fileBuffer.put(bb);
-
-		}
-		if (saved[group] != null) {
-			fileBuffer.put(saved[group]);
-			saved[group] = null;
-		}
-		while (i.hasNext() && saved[group] == null) {
-			value = i.next();
-			bb = serializer[group].serialize(value);
-			if (bb.remaining() > MAPPED_FILE_SIZE) {
-				throw new RuntimeException("Serialized object does not fit into a single buffer.");
-			}
-			if (bb.remaining() <= fileBuffer.remaining()) {
-				fileBuffer.put(bb);
-			} else {
-				saved[group] = bb;
-			}
-		}
-
-		int size = fileBuffer.position();
-		return size;
-	}
-
-	private enum SupportedTypes {
-		TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, FLOAT, DOUBLE, STRING, OTHER, NULL
-	}
-
-	//=====Serializer===================================================================================================
-	private Serializer getSerializer(Object value) throws IOException {
-		String className = value.getClass().getSimpleName().toUpperCase();
-		if (className.startsWith("TUPLE")) {
-			className = "TUPLE";
-		}
-		if (className.startsWith("BYTE[]")) {
-			className = "BYTES";
-		}
-		SupportedTypes type = SupportedTypes.valueOf(className);
-		switch (type) {
-			case TUPLE:
-				fileBuffer.put(TYPE_TUPLE);
-				fileBuffer.putInt(((Tuple) value).getArity());
-				return new TupleSerializer((Tuple) value);
-			case BOOLEAN:
-				fileBuffer.put(TYPE_BOOLEAN);
-				return new BooleanSerializer();
-			case BYTE:
-				fileBuffer.put(TYPE_BYTE);
-				return new ByteSerializer();
-			case BYTES:
-				fileBuffer.put(TYPE_BYTES);
-				return new BytesSerializer();
-			case CHARACTER:
-				fileBuffer.put(TYPE_CHAR);
-				return new CharSerializer();
-			case SHORT:
-				fileBuffer.put(TYPE_SHORT);
-				return new ShortSerializer();
-			case INTEGER:
-				fileBuffer.put(TYPE_INTEGER);
-				return new IntSerializer();
-			case LONG:
-				fileBuffer.put(TYPE_LONG);
-				return new LongSerializer();
-			case STRING:
-				fileBuffer.put(TYPE_STRING);
-				return new StringSerializer();
-			case FLOAT:
-				fileBuffer.put(TYPE_FLOAT);
-				return new FloatSerializer();
-			case DOUBLE:
-				fileBuffer.put(TYPE_DOUBLE);
-				return new DoubleSerializer();
-			case NULL:
-				fileBuffer.put(TYPE_NULL);
-				return new NullSerializer();
-			default:
-				throw new IllegalArgumentException("Unknown Type encountered: " + type);
-		}
-	}
-
-	private abstract class Serializer<T> {
-		protected ByteBuffer buffer;
-
-		public Serializer(int capacity) {
-			buffer = ByteBuffer.allocate(capacity);
-		}
-
-		public ByteBuffer serialize(T value) {
-			buffer.clear();
-			serializeInternal(value);
-			buffer.flip();
-			return buffer;
-		}
-
-		public abstract void serializeInternal(T value);
-	}
-
-	private class ByteSerializer extends Serializer<Byte> {
-		public ByteSerializer() {
-			super(1);
-		}
-
-		@Override
-		public void serializeInternal(Byte value) {
-			buffer.put(value);
-		}
-	}
-
-	private class BooleanSerializer extends Serializer<Boolean> {
-		public BooleanSerializer() {
-			super(1);
-		}
-
-		@Override
-		public void serializeInternal(Boolean value) {
-			buffer.put(value ? (byte) 1 : (byte) 0);
-		}
-	}
-
-	private class CharSerializer extends Serializer<Character> {
-		public CharSerializer() {
-			super(4);
-		}
-
-		@Override
-		public void serializeInternal(Character value) {
-			buffer.put((value + "").getBytes());
-		}
-	}
-
-	private class ShortSerializer extends Serializer<Short> {
-		public ShortSerializer() {
-			super(2);
-		}
-
-		@Override
-		public void serializeInternal(Short value) {
-			buffer.putShort(value);
-		}
-	}
-
-	private class IntSerializer extends Serializer<Integer> {
-		public IntSerializer() {
-			super(4);
-		}
-
-		@Override
-		public void serializeInternal(Integer value) {
-			buffer.putInt(value);
-		}
-	}
-
-	private class LongSerializer extends Serializer<Long> {
-		public LongSerializer() {
-			super(8);
-		}
-
-		@Override
-		public void serializeInternal(Long value) {
-			buffer.putLong(value);
-		}
-	}
-
-	private class StringSerializer extends Serializer<String> {
-		public StringSerializer() {
-			super(0);
-		}
-
-		@Override
-		public void serializeInternal(String value) {
-			byte[] bytes = value.getBytes();
-			buffer = ByteBuffer.allocate(bytes.length + 4);
-			buffer.putInt(bytes.length);
-			buffer.put(bytes);
-		}
-	}
-
-	private class FloatSerializer extends Serializer<Float> {
-		public FloatSerializer() {
-			super(4);
-		}
-
-		@Override
-		public void serializeInternal(Float value) {
-			buffer.putFloat(value);
-		}
-	}
-
-	private class DoubleSerializer extends Serializer<Double> {
-		public DoubleSerializer() {
-			super(8);
-		}
-
-		@Override
-		public void serializeInternal(Double value) {
-			buffer.putDouble(value);
-		}
-	}
-
-	private class NullSerializer extends Serializer<Object> {
-		public NullSerializer() {
-			super(0);
-		}
-
-		@Override
-		public void serializeInternal(Object value) {
-		}
-	}
-
-	private class BytesSerializer extends Serializer<byte[]> {
-		public BytesSerializer() {
-			super(0);
-		}
-
-		@Override
-		public void serializeInternal(byte[] value) {
-			buffer = ByteBuffer.allocate(4 + value.length);
-			buffer.putInt(value.length);
-			buffer.put(value);
-		}
-	}
-
-	private class TupleSerializer extends Serializer<Tuple> {
-		private final Serializer[] serializer;
-		private final List<ByteBuffer> buffers;
-
-		public TupleSerializer(Tuple value) throws IOException {
-			super(0);
-			serializer = new Serializer[value.getArity()];
-			buffers = new ArrayList();
-			for (int x = 0; x < serializer.length; x++) {
-				serializer[x] = getSerializer(value.getField(x));
-			}
-		}
-
-		@Override
-		public void serializeInternal(Tuple value) {
-			int length = 0;
-			for (int x = 0; x < serializer.length; x++) {
-				serializer[x].buffer.clear();
-				serializer[x].serializeInternal(value.getField(x));
-				length += serializer[x].buffer.position();
-				buffers.add(serializer[x].buffer);
-			}
-			buffer = ByteBuffer.allocate(length);
-			for (ByteBuffer b : buffers) {
-				b.flip();
-				buffer.put(b);
-			}
-			buffers.clear();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java
deleted file mode 100644
index 1ad0606..0000000
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.common.streaming;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-/**
- * Simple utility class to print all contents of an inputstream to stdout.
- */
-public class StreamPrinter extends Thread {
-	private final BufferedReader reader;
-	private final boolean wrapInException;
-	private StringBuilder msg;
-
-	public StreamPrinter(InputStream stream) {
-		this(stream, false, null);
-	}
-
-	public StreamPrinter(InputStream stream, boolean wrapInException, StringBuilder msg) {
-		this.reader = new BufferedReader(new InputStreamReader(stream));
-		this.wrapInException = wrapInException;
-		this.msg = msg;
-	}
-
-	@Override
-	public void run() {
-		String line;
-		try {
-			if (wrapInException) {
-				while ((line = reader.readLine()) != null) {
-					msg.append("\n" + line);
-				}
-			} else {
-				while ((line = reader.readLine()) != null) {
-					System.out.println(line);
-					System.out.flush();
-				}
-			}
-		} catch (IOException ex) {
-		}
-	}
-}