You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2015/11/12 21:09:20 UTC
[5/8] flink git commit: [FLINK-2901] Move PythonAPI to flink-libraries
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
new file mode 100644
index 0000000..2116d1f
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
@@ -0,0 +1,264 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.functions.MapFunction import MapFunction
+from flink.functions.FlatMapFunction import FlatMapFunction
+from flink.functions.FilterFunction import FilterFunction
+from flink.functions.MapPartitionFunction import MapPartitionFunction
+from flink.functions.ReduceFunction import ReduceFunction
+from flink.functions.CrossFunction import CrossFunction
+from flink.functions.JoinFunction import JoinFunction
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+from flink.functions.CoGroupFunction import CoGroupFunction
+from flink.plan.Constants import INT, STRING, FLOAT, BOOL, Order
+
+
+class Mapper(MapFunction):
+ def map(self, value):
+ return value * value
+
+
+class Filter(FilterFunction):
+ def __init__(self, limit):
+ super(Filter, self).__init__()
+ self.limit = limit
+
+ def filter(self, value):
+ return value > self.limit
+
+
+class FlatMap(FlatMapFunction):
+ def flat_map(self, value, collector):
+ collector.collect(value)
+ collector.collect(value * 2)
+
+
+class MapPartition(MapPartitionFunction):
+ def map_partition(self, iterator, collector):
+ for value in iterator:
+ collector.collect(value * 2)
+
+
+class Reduce(ReduceFunction):
+ def reduce(self, value1, value2):
+ return value1 + value2
+
+
+class Reduce2(ReduceFunction):
+ def reduce(self, value1, value2):
+ return (value1[0] + value2[0], value1[1] + value2[1], value1[2], value1[3] or value2[3])
+
+
+class Cross(CrossFunction):
+ def cross(self, value1, value2):
+ return (value1, value2[3])
+
+
+class MapperBcv(MapFunction):
+ def map(self, value):
+ factor = self.context.get_broadcast_variable("test")[0][0]
+ return value * factor
+
+
+class Join(JoinFunction):
+ def join(self, value1, value2):
+ if value1[3]:
+ return value2[0] + str(value1[0])
+ else:
+ return value2[0] + str(value1[1])
+
+
+class GroupReduce(GroupReduceFunction):
+ def reduce(self, iterator, collector):
+ if iterator.has_next():
+ i, f, s, b = iterator.next()
+ for value in iterator:
+ i += value[0]
+ f += value[1]
+ b |= value[3]
+ collector.collect((i, f, s, b))
+
+
+class GroupReduce2(GroupReduceFunction):
+ def reduce(self, iterator, collector):
+ for value in iterator:
+ collector.collect(value)
+
+
+class GroupReduce3(GroupReduceFunction):
+ def reduce(self, iterator, collector):
+ collector.collect(iterator.next())
+
+ def combine(self, iterator, collector):
+ if iterator.has_next():
+ v1 = iterator.next()
+ if iterator.has_next():
+ v2 = iterator.next()
+ if v1[0] < v2[0]:
+ collector.collect(v1)
+ else:
+ collector.collect(v2)
+
+
+class CoGroup(CoGroupFunction):
+ def co_group(self, iterator1, iterator2, collector):
+ while iterator1.has_next() and iterator2.has_next():
+ collector.collect((iterator1.next(), iterator2.next()))
+
+
+class Id(MapFunction):
+ def map(self, value):
+ return value
+
+
+class Verify(MapPartitionFunction):
+ def __init__(self, expected, name):
+ super(Verify, self).__init__()
+ self.expected = expected
+ self.name = name
+
+ def map_partition(self, iterator, collector):
+ index = 0
+ for value in iterator:
+ if value != self.expected[index]:
+ print(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value))
+ raise Exception(self.name + " failed!")
+ index += 1
+ collector.collect(self.name + " successful!")
+
+
+class Verify2(MapPartitionFunction):
+ def __init__(self, expected, name):
+ super(Verify2, self).__init__()
+ self.expected = expected
+ self.name = name
+
+ def map_partition(self, iterator, collector):
+ for value in iterator:
+ if value in self.expected:
+ try:
+ self.expected.remove(value)
+ except Exception:
+ raise Exception(self.name + " failed!")
+ collector.collect(self.name + " successful!")
+
+
+if __name__ == "__main__":
+ env = get_environment()
+
+ d1 = env.from_elements(1, 6, 12)
+
+ d2 = env.from_elements((1, 0.5, "hello", True), (2, 0.4, "world", False))
+
+ d3 = env.from_elements(("hello",), ("world",))
+
+ d4 = env.from_elements((1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False))
+
+ d5 = env.from_elements((4.4, 4.3, 1), (4.3, 4.4, 1), (4.2, 4.1, 3), (4.1, 4.1, 3))
+
+ d1 \
+ .map((lambda x: x * x), INT).map(Mapper(), INT) \
+ .map_partition(Verify([1, 1296, 20736], "Map"), STRING).output()
+
+ d1 \
+ .map(Mapper(), INT).map((lambda x: x * x), INT) \
+ .map_partition(Verify([1, 1296, 20736], "Chained Lambda"), STRING).output()
+
+ d1 \
+ .filter(Filter(5)).filter(Filter(8)) \
+ .map_partition(Verify([12], "Filter"), STRING).output()
+
+ d1 \
+ .flat_map(FlatMap(), INT).flat_map(FlatMap(), INT) \
+ .map_partition(Verify([1, 2, 2, 4, 6, 12, 12, 24, 12, 24, 24, 48], "FlatMap"), STRING).output()
+
+ d1 \
+ .map_partition(MapPartition(), INT) \
+ .map_partition(Verify([2, 12, 24], "MapPartition"), STRING).output()
+
+ d1 \
+ .reduce(Reduce()) \
+ .map_partition(Verify([19], "AllReduce"), STRING).output()
+
+ d4 \
+ .group_by(2).reduce(Reduce2()) \
+ .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "CombineReduce"), STRING).output()
+
+ d4 \
+ .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce(Reduce2()) \
+ .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "ChainedReduce"), STRING).output()
+
+ d1 \
+ .map(MapperBcv(), INT).with_broadcast_set("test", d2) \
+ .map_partition(Verify([1, 6, 12], "Broadcast"), STRING).output()
+
+ d1 \
+ .cross(d2).using(Cross(), (INT, BOOL)) \
+ .map_partition(Verify([(1, True), (1, False), (6, True), (6, False), (12, True), (12, False)], "Cross"), STRING).output()
+
+ d1 \
+ .cross(d3) \
+ .map_partition(Verify([(1, ("hello",)), (1, ("world",)), (6, ("hello",)), (6, ("world",)), (12, ("hello",)), (12, ("world",))], "Default Cross"), STRING).output()
+
+ d2 \
+ .cross(d3).project_second(0).project_first(0, 1) \
+ .map_partition(Verify([("hello", 1, 0.5), ("world", 1, 0.5), ("hello", 2, 0.4), ("world", 2, 0.4)], "Project Cross"), STRING).output()
+
+ d2 \
+ .join(d3).where(2).equal_to(0).using(Join(), STRING) \
+ .map_partition(Verify(["hello1", "world0.4"], "Join"), STRING).output()
+
+ d2 \
+ .join(d3).where(2).equal_to(0).project_first(0, 3).project_second(0) \
+ .map_partition(Verify([(1, True, "hello"), (2, False, "world")], "Project Join"), STRING).output()
+
+ d2 \
+ .join(d3).where(2).equal_to(0) \
+ .map_partition(Verify([((1, 0.5, "hello", True), ("hello",)), ((2, 0.4, "world", False), ("world",))], "Default Join"), STRING).output()
+
+ d2 \
+ .project(0, 1, 2) \
+ .map_partition(Verify([(1, 0.5, "hello"), (2, 0.4, "world")], "Project"), STRING).output()
+
+ d2 \
+ .union(d4) \
+ .map_partition(Verify2([(1, 0.5, "hello", True), (2, 0.4, "world", False), (1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False)], "Union"), STRING).output()
+
+ d4 \
+ .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=False) \
+ .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "AllGroupReduce"), STRING).output()
+
+ d4 \
+ .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=True) \
+ .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "ChainedGroupReduce"), STRING).output()
+
+ d4 \
+ .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=True) \
+ .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "CombineGroupReduce"), STRING).output()
+
+ d5 \
+ .group_by(2).sort_group(0, Order.DESCENDING).sort_group(1, Order.ASCENDING).reduce_group(GroupReduce3(), (FLOAT, FLOAT, INT), combinable=True) \
+ .map_partition(Verify([(4.3, 4.4, 1), (4.1, 4.1, 3)], "ChainedSortedGroupReduce"), STRING).output()
+
+ d4 \
+ .co_group(d5).where(0).equal_to(2).using(CoGroup(), ((INT, FLOAT, STRING, BOOL), (FLOAT, FLOAT, INT))) \
+ .map_partition(Verify([((1, 0.5, "hello", True), (4.4, 4.3, 1)), ((1, 0.4, "hello", False), (4.3, 4.4, 1))], "CoGroup"), STRING).output()
+
+ env.set_degree_of_parallelism(1)
+
+ env.execute(local=True)
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py
new file mode 100644
index 0000000..1f90587
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py
@@ -0,0 +1,30 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import WriteMode
+
+if __name__ == "__main__":
+ env = get_environment()
+
+ d1 = env.read_text("src/test/python/org/apache/flink/python/api/data_text")
+
+ d1.write_text("/tmp/flink/result", WriteMode.OVERWRITE)
+
+ env.set_degree_of_parallelism(1)
+
+ env.execute(local=True)
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
new file mode 100644
index 0000000..a3b8d07
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
@@ -0,0 +1,63 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import _Fields
+from flink.plan.Constants import INT, STRING, BOOL, FLOAT
+import sys
+
+if __name__ == "__main__":
+ env = get_environment()
+
+ d1 = env.from_elements(("hello", 4, 3.2, True))
+
+ d2 = env.from_elements("world")
+
+ direct_from_source = d1.filter(lambda x:True)
+
+ if direct_from_source._info[_Fields.TYPES] != ("hello", 4, 3.2, True):
+ sys.exit("Error deducting type directly from source.")
+
+ from_common_udf = d1.map(lambda x: x[3], BOOL).filter(lambda x:True)
+
+ if from_common_udf._info[_Fields.TYPES] != BOOL:
+ sys.exit("Error deducting type from common udf.")
+
+ through_projection = d1.project(3, 2).filter(lambda x:True)
+
+ if through_projection._info[_Fields.TYPES] != (True, 3.2):
+ sys.exit("Error deducting type through projection.")
+
+ through_default_op = d1.cross(d2).filter(lambda x:True)
+
+ if through_default_op._info[_Fields.TYPES] != (("hello", 4, 3.2, True), "world"):
+ sys.exit("Error deducting type through default J/C." +str(through_default_op._info[_Fields.TYPES]))
+
+ through_prj_op = d1.cross(d2).project_first(1, 0).project_second().project_first(3, 2).filter(lambda x:True)
+
+ if through_prj_op._info[_Fields.TYPES] != (4, "hello", "world", True, 3.2):
+ sys.exit("Error deducting type through projection J/C. "+str(through_prj_op._info[_Fields.TYPES]))
+
+
+ env = get_environment()
+
+ msg = env.from_elements("Type deduction test successful.")
+
+ msg.output()
+
+ env.execute()
+
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py
new file mode 100644
index 0000000..f5f3ee4
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py
@@ -0,0 +1,70 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.functions.MapFunction import MapFunction
+from flink.functions.MapPartitionFunction import MapPartitionFunction
+from flink.plan.Constants import BOOL, INT, FLOAT, STRING, BYTES
+
+
+class Verify(MapPartitionFunction):
+ def __init__(self, expected, name):
+ super(Verify, self).__init__()
+ self.expected = expected
+ self.name = name
+
+ def map_partition(self, iterator, collector):
+ index = 0
+ for value in iterator:
+ if value != self.expected[index]:
+ print(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value))
+ raise Exception(self.name + " failed!")
+ index += 1
+ collector.collect(self.name + " successful!")
+
+
+class Id(MapFunction):
+ def map(self, value):
+ return value
+
+
+if __name__ == "__main__":
+ env = get_environment()
+
+ d1 = env.from_elements(bytearray(b"hello"), bytearray(b"world"))
+
+ d1.map(Id(), BYTES).map_partition(Verify([bytearray(b"hello"), bytearray(b"world")], "Byte"), STRING).output()
+
+ d2 = env.from_elements(1,2,3,4,5)
+
+ d2.map(Id(), INT).map_partition(Verify([1,2,3,4,5], "Int"), STRING).output()
+
+ d3 = env.from_elements(True, True, False)
+
+ d3.map(Id(), BOOL).map_partition(Verify([True, True, False], "Bool"), STRING).output()
+
+ d4 = env.from_elements(1.4, 1.7, 12312.23)
+
+ d4.map(Id(), FLOAT).map_partition(Verify([1.4, 1.7, 12312.23], "Float"), STRING).output()
+
+ d5 = env.from_elements("hello", "world")
+
+ d5.map(Id(), STRING).map_partition(Verify(["hello", "world"], "String"), STRING).output()
+
+ env.set_degree_of_parallelism(1)
+
+ env.execute(local=True)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/pom.xml b/flink-libraries/pom.xml
index 065127b..e1dc2d9 100644
--- a/flink-libraries/pom.xml
+++ b/flink-libraries/pom.xml
@@ -36,5 +36,6 @@ under the License.
<modules>
<module>flink-gelly</module>
<module>flink-gelly-scala</module>
+ <module>flink-python</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml b/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml
deleted file mode 100644
index 7a8f8af..0000000
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml
+++ /dev/null
@@ -1,61 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-language-binding-parent</artifactId>
- <version>1.0-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-language-binding-generic</artifactId>
- <name>flink-language-binding-generic</name>
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-optimizer</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java
deleted file mode 100644
index bdb0444..0000000
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.common;
-
-import java.io.IOException;
-import java.util.Arrays;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple;
-import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.languagebinding.api.java.common.PlanBinder.Operation;
-import static org.apache.flink.languagebinding.api.java.common.PlanBinder.normalizeKeys;
-import static org.apache.flink.languagebinding.api.java.common.PlanBinder.toIntArray;
-import org.apache.flink.languagebinding.api.java.common.streaming.Receiver;
-
-/**
- * Container for all generic information related to operations. This class contains the absolute minimum fields that are
- * required for all operations. This class should be extended to contain any additional fields required on a
- * per-language basis.
- */
-public class OperationInfo {
- public int parentID; //DataSet that an operation is applied on
- public int otherID; //secondary DataSet
- public int setID; //ID for new DataSet
- public String[] keys;
- public String[] keys1; //join/cogroup keys
- public String[] keys2; //join/cogroup keys
- public TypeInformation<?> types; //typeinformation about output type
- public AggregationEntry[] aggregates;
- public ProjectionEntry[] projections; //projectFirst/projectSecond
- public Object[] values;
- public int count;
- public int field;
- public int[] fields;
- public Order order;
- public String path;
- public String fieldDelimiter;
- public String lineDelimiter;
- public long from;
- public long to;
- public WriteMode writeMode;
- public boolean toError;
- public String name;
-
- public OperationInfo() {
- }
-
- public OperationInfo(Receiver receiver, Operation identifier) throws IOException {
- Object tmpType;
- switch (identifier) {
- case SOURCE_CSV:
- setID = (Integer) receiver.getRecord(true);
- path = (String) receiver.getRecord();
- fieldDelimiter = (String) receiver.getRecord();
- lineDelimiter = (String) receiver.getRecord();
- tmpType = (Tuple) receiver.getRecord();
- types = tmpType == null ? null : getForObject(tmpType);
- return;
- case SOURCE_TEXT:
- setID = (Integer) receiver.getRecord(true);
- path = (String) receiver.getRecord();
- return;
- case SOURCE_VALUE:
- setID = (Integer) receiver.getRecord(true);
- int valueCount = (Integer) receiver.getRecord(true);
- values = new Object[valueCount];
- for (int x = 0; x < valueCount; x++) {
- values[x] = receiver.getRecord();
- }
- return;
- case SOURCE_SEQ:
- setID = (Integer) receiver.getRecord(true);
- from = (Long) receiver.getRecord();
- to = (Long) receiver.getRecord();
- return;
- case SINK_CSV:
- parentID = (Integer) receiver.getRecord(true);
- path = (String) receiver.getRecord();
- fieldDelimiter = (String) receiver.getRecord();
- lineDelimiter = (String) receiver.getRecord();
- writeMode = ((Integer) receiver.getRecord(true)) == 1
- ? WriteMode.OVERWRITE
- : WriteMode.NO_OVERWRITE;
- return;
- case SINK_TEXT:
- parentID = (Integer) receiver.getRecord(true);
- path = (String) receiver.getRecord();
- writeMode = ((Integer) receiver.getRecord(true)) == 1
- ? WriteMode.OVERWRITE
- : WriteMode.NO_OVERWRITE;
- return;
- case SINK_PRINT:
- parentID = (Integer) receiver.getRecord(true);
- toError = (Boolean) receiver.getRecord();
- return;
- case BROADCAST:
- parentID = (Integer) receiver.getRecord(true);
- otherID = (Integer) receiver.getRecord(true);
- name = (String) receiver.getRecord();
- return;
- }
- setID = (Integer) receiver.getRecord(true);
- parentID = (Integer) receiver.getRecord(true);
- switch (identifier) {
- case AGGREGATE:
- count = (Integer) receiver.getRecord(true);
- aggregates = new AggregationEntry[count];
- for (int x = 0; x < count; x++) {
- int encodedAgg = (Integer) receiver.getRecord(true);
- int field = (Integer) receiver.getRecord(true);
- aggregates[x] = new AggregationEntry(encodedAgg, field);
- }
- return;
- case FIRST:
- count = (Integer) receiver.getRecord(true);
- return;
- case DISTINCT:
- case GROUPBY:
- case PARTITION_HASH:
- keys = normalizeKeys(receiver.getRecord(true));
- return;
- case PROJECTION:
- fields = toIntArray(receiver.getRecord(true));
- return;
- case REBALANCE:
- return;
- case SORT:
- field = (Integer) receiver.getRecord(true);
- int encodedOrder = (Integer) receiver.getRecord(true);
- switch (encodedOrder) {
- case 0:
- order = Order.NONE;
- break;
- case 1:
- order = Order.ASCENDING;
- break;
- case 2:
- order = Order.DESCENDING;
- break;
- case 3:
- order = Order.ANY;
- break;
- default:
- order = Order.NONE;
- break;
- }
- return;
- case UNION:
- otherID = (Integer) receiver.getRecord(true);
- return;
- }
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("SetID: ").append(setID).append("\n");
- sb.append("ParentID: ").append(parentID).append("\n");
- sb.append("OtherID: ").append(otherID).append("\n");
- sb.append("Name: ").append(name).append("\n");
- sb.append("Types: ").append(types).append("\n");
- sb.append("Keys1: ").append(Arrays.toString(keys1)).append("\n");
- sb.append("Keys2: ").append(Arrays.toString(keys2)).append("\n");
- sb.append("Keys: ").append(Arrays.toString(keys)).append("\n");
- sb.append("Aggregates: ").append(Arrays.toString(aggregates)).append("\n");
- sb.append("Projections: ").append(Arrays.toString(projections)).append("\n");
- sb.append("Count: ").append(count).append("\n");
- sb.append("Field: ").append(field).append("\n");
- sb.append("Order: ").append(order.toString()).append("\n");
- sb.append("Path: ").append(path).append("\n");
- sb.append("FieldDelimiter: ").append(fieldDelimiter).append("\n");
- sb.append("LineDelimiter: ").append(lineDelimiter).append("\n");
- sb.append("From: ").append(from).append("\n");
- sb.append("To: ").append(to).append("\n");
- sb.append("WriteMode: ").append(writeMode).append("\n");
- sb.append("toError: ").append(toError).append("\n");
- return sb.toString();
- }
-
- public static class AggregationEntry {
- public Aggregations agg;
- public int field;
-
- public AggregationEntry(int encodedAgg, int field) {
- switch (encodedAgg) {
- case 0:
- agg = Aggregations.MAX;
- break;
- case 1:
- agg = Aggregations.MIN;
- break;
- case 2:
- agg = Aggregations.SUM;
- break;
- }
- this.field = field;
- }
-
- @Override
- public String toString() {
- return agg.toString() + " - " + field;
- }
- }
-
- public static class ProjectionEntry {
- public ProjectionSide side;
- public int[] keys;
-
- public ProjectionEntry(ProjectionSide side, int[] keys) {
- this.side = side;
- this.keys = keys;
- }
-
- @Override
- public String toString() {
- return side + " - " + Arrays.toString(keys);
- }
- }
-
- public enum ProjectionSide {
- FIRST,
- SECOND
- }
-
- public enum DatasizeHint {
- NONE,
- TINY,
- HUGE
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
deleted file mode 100644
index ca252f8..0000000
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
+++ /dev/null
@@ -1,582 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.common;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvInputFormat;
-import org.apache.flink.api.java.io.PrintingOutputFormat;
-import org.apache.flink.api.java.operators.AggregateOperator;
-import org.apache.flink.api.java.operators.CrossOperator.DefaultCross;
-import org.apache.flink.api.java.operators.CrossOperator.ProjectCross;
-import org.apache.flink.api.java.operators.Grouping;
-import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin;
-import org.apache.flink.api.java.operators.JoinOperator.ProjectJoin;
-import org.apache.flink.api.java.operators.SortedGrouping;
-import org.apache.flink.api.java.operators.UdfOperator;
-import org.apache.flink.api.java.operators.UnsortedGrouping;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint;
-import static org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.HUGE;
-import static org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.NONE;
-import static org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.TINY;
-import org.apache.flink.languagebinding.api.java.common.OperationInfo.ProjectionEntry;
-import org.apache.flink.languagebinding.api.java.common.streaming.Receiver;
-
-/**
- * Generic class to construct a Flink plan based on external data.
- *
- * @param <INFO>
- */
-public abstract class PlanBinder<INFO extends OperationInfo> {
- public static final String PLANBINDER_CONFIG_BCVAR_COUNT = "PLANBINDER_BCVAR_COUNT";
- public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_";
-
- protected static String FLINK_HDFS_PATH = "hdfs:/tmp";
- public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + "/flink_data";
-
- public static boolean DEBUG = false;
-
- protected HashMap<Integer, Object> sets = new HashMap();
- public static ExecutionEnvironment env;
- protected Receiver receiver;
-
- public static final int MAPPED_FILE_SIZE = 1024 * 1024 * 64;
-
- //====Plan==========================================================================================================
- protected void receivePlan() throws IOException {
- receiveParameters();
- receiveOperations();
- }
-
- //====Environment===================================================================================================
- /**
- * This enum contains the identifiers for all supported environment parameters.
- */
- private enum Parameters {
- DOP,
- MODE,
- RETRY,
- DEBUG
- }
-
- private void receiveParameters() throws IOException {
- Integer parameterCount = (Integer) receiver.getRecord(true);
-
- for (int x = 0; x < parameterCount; x++) {
- Tuple value = (Tuple) receiver.getRecord(true);
- switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) {
- case DOP:
- Integer dop = (Integer) value.getField(1);
- env.setParallelism(dop);
- break;
- case MODE:
- FLINK_HDFS_PATH = (Boolean) value.getField(1) ? "file:/tmp/flink" : "hdfs:/tmp/flink";
- break;
- case RETRY:
- int retry = (Integer) value.getField(1);
- env.setNumberOfExecutionRetries(retry);
- break;
- case DEBUG:
- DEBUG = (Boolean) value.getField(1);
- break;
- }
- }
- if (env.getParallelism() < 0) {
- env.setParallelism(1);
- }
- }
-
- //====Operations====================================================================================================
- /**
- * This enum contains the identifiers for all supported non-UDF DataSet operations.
- */
- protected enum Operation {
- SOURCE_CSV, SOURCE_TEXT, SOURCE_VALUE, SOURCE_SEQ, SINK_CSV, SINK_TEXT, SINK_PRINT,
- PROJECTION, SORT, UNION, FIRST, DISTINCT, GROUPBY, AGGREGATE,
- REBALANCE, PARTITION_HASH,
- BROADCAST
- }
-
- /**
- * This enum contains the identifiers for all supported UDF DataSet operations.
- */
- protected enum AbstractOperation {
- COGROUP, CROSS, CROSS_H, CROSS_T, FILTER, FLATMAP, GROUPREDUCE, JOIN, JOIN_H, JOIN_T, MAP, REDUCE, MAPPARTITION,
- }
-
- protected void receiveOperations() throws IOException {
- Integer operationCount = (Integer) receiver.getRecord(true);
- for (int x = 0; x < operationCount; x++) {
- String identifier = (String) receiver.getRecord();
- Operation op = null;
- AbstractOperation aop = null;
- try {
- op = Operation.valueOf(identifier.toUpperCase());
- } catch (IllegalArgumentException iae) {
- try {
- aop = AbstractOperation.valueOf(identifier.toUpperCase());
- } catch (IllegalArgumentException iae2) {
- throw new IllegalArgumentException("Invalid operation specified: " + identifier);
- }
- }
- if (op != null) {
- switch (op) {
- case SOURCE_CSV:
- createCsvSource(createOperationInfo(op));
- break;
- case SOURCE_TEXT:
- createTextSource(createOperationInfo(op));
- break;
- case SOURCE_VALUE:
- createValueSource(createOperationInfo(op));
- break;
- case SOURCE_SEQ:
- createSequenceSource(createOperationInfo(op));
- break;
- case SINK_CSV:
- createCsvSink(createOperationInfo(op));
- break;
- case SINK_TEXT:
- createTextSink(createOperationInfo(op));
- break;
- case SINK_PRINT:
- createPrintSink(createOperationInfo(op));
- break;
- case BROADCAST:
- createBroadcastVariable(createOperationInfo(op));
- break;
- case AGGREGATE:
- createAggregationOperation(createOperationInfo(op));
- break;
- case DISTINCT:
- createDistinctOperation(createOperationInfo(op));
- break;
- case FIRST:
- createFirstOperation(createOperationInfo(op));
- break;
- case PARTITION_HASH:
- createHashPartitionOperation(createOperationInfo(op));
- break;
- case PROJECTION:
- createProjectOperation(createOperationInfo(op));
- break;
- case REBALANCE:
- createRebalanceOperation(createOperationInfo(op));
- break;
- case GROUPBY:
- createGroupOperation(createOperationInfo(op));
- break;
- case SORT:
- createSortOperation(createOperationInfo(op));
- break;
- case UNION:
- createUnionOperation(createOperationInfo(op));
- break;
- }
- }
- if (aop != null) {
- switch (aop) {
- case COGROUP:
- createCoGroupOperation(createOperationInfo(aop));
- break;
- case CROSS:
- createCrossOperation(NONE, createOperationInfo(aop));
- break;
- case CROSS_H:
- createCrossOperation(HUGE, createOperationInfo(aop));
- break;
- case CROSS_T:
- createCrossOperation(TINY, createOperationInfo(aop));
- break;
- case FILTER:
- createFilterOperation(createOperationInfo(aop));
- break;
- case FLATMAP:
- createFlatMapOperation(createOperationInfo(aop));
- break;
- case GROUPREDUCE:
- createGroupReduceOperation(createOperationInfo(aop));
- break;
- case JOIN:
- createJoinOperation(NONE, createOperationInfo(aop));
- break;
- case JOIN_H:
- createJoinOperation(HUGE, createOperationInfo(aop));
- break;
- case JOIN_T:
- createJoinOperation(TINY, createOperationInfo(aop));
- break;
- case MAP:
- createMapOperation(createOperationInfo(aop));
- break;
- case MAPPARTITION:
- createMapPartitionOperation(createOperationInfo(aop));
- break;
- case REDUCE:
- createReduceOperation(createOperationInfo(aop));
- break;
- }
- }
- }
- }
-
- /**
- * This method creates an OperationInfo object based on the operation-identifier passed.
- *
- * @param operationIdentifier
- * @return
- * @throws IOException
- */
- protected OperationInfo createOperationInfo(Operation operationIdentifier) throws IOException {
- return new OperationInfo(receiver, operationIdentifier);
- }
-
- /**
- * This method creates an OperationInfo object based on the operation-identifier passed.
- *
- * @param operationIdentifier
- * @return
- * @throws IOException
- */
- protected abstract INFO createOperationInfo(AbstractOperation operationIdentifier) throws IOException;
-
- private void createCsvSource(OperationInfo info) throws IOException {
- if (!(info.types instanceof CompositeType)) {
- throw new RuntimeException("The output type of a csv source has to be a tuple or a " +
- "pojo type. The derived type is " + info);
- }
-
- sets.put(info.setID, env.createInput(new CsvInputFormat(new Path(info.path),
- info.lineDelimiter, info.fieldDelimiter, (CompositeType)info.types), info.types)
- .name("CsvSource"));
- }
-
- private void createTextSource(OperationInfo info) throws IOException {
- sets.put(info.setID, env.readTextFile(info.path).name("TextSource"));
- }
-
- private void createValueSource(OperationInfo info) throws IOException {
- sets.put(info.setID, env.fromElements(info.values).name("ValueSource"));
- }
-
- private void createSequenceSource(OperationInfo info) throws IOException {
- sets.put(info.setID, env.generateSequence(info.from, info.to).name("SequenceSource"));
- }
-
- private void createCsvSink(OperationInfo info) throws IOException {
- DataSet parent = (DataSet) sets.get(info.parentID);
- parent.writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).name("CsvSink");
- }
-
- private void createTextSink(OperationInfo info) throws IOException {
- DataSet parent = (DataSet) sets.get(info.parentID);
- parent.writeAsText(info.path, info.writeMode).name("TextSink");
- }
-
- private void createPrintSink(OperationInfo info) throws IOException {
- DataSet parent = (DataSet) sets.get(info.parentID);
- parent.output(new PrintingOutputFormat(info.toError));
- }
-
- private void createBroadcastVariable(OperationInfo info) throws IOException {
- UdfOperator op1 = (UdfOperator) sets.get(info.parentID);
- DataSet op2 = (DataSet) sets.get(info.otherID);
-
- op1.withBroadcastSet(op2, info.name);
- Configuration c = ((UdfOperator) op1).getParameters();
-
- if (c == null) {
- c = new Configuration();
- }
-
- int count = c.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
- c.setInteger(PLANBINDER_CONFIG_BCVAR_COUNT, count + 1);
- c.setString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + count, info.name);
-
- op1.withParameters(c);
- }
-
- private void createAggregationOperation(OperationInfo info) throws IOException {
- DataSet op = (DataSet) sets.get(info.parentID);
- AggregateOperator ao = op.aggregate(info.aggregates[0].agg, info.aggregates[0].field);
-
- for (int x = 1; x < info.count; x++) {
- ao = ao.and(info.aggregates[x].agg, info.aggregates[x].field);
- }
-
- sets.put(info.setID, ao.name("Aggregation"));
- }
-
- private void createDistinctOperation(OperationInfo info) throws IOException {
- DataSet op = (DataSet) sets.get(info.parentID);
- sets.put(info.setID, info.keys.length == 0 ? op.distinct() : op.distinct(info.keys).name("Distinct"));
- }
-
- private void createFirstOperation(OperationInfo info) throws IOException {
- DataSet op = (DataSet) sets.get(info.parentID);
- sets.put(info.setID, op.first(info.count).name("First"));
- }
-
- private void createGroupOperation(OperationInfo info) throws IOException {
- DataSet op1 = (DataSet) sets.get(info.parentID);
- sets.put(info.setID, op1.groupBy(info.keys));
- }
-
- private void createHashPartitionOperation(OperationInfo info) throws IOException {
- DataSet op1 = (DataSet) sets.get(info.parentID);
- sets.put(info.setID, op1.partitionByHash(info.keys));
- }
-
- private void createProjectOperation(OperationInfo info) throws IOException {
- DataSet op1 = (DataSet) sets.get(info.parentID);
- sets.put(info.setID, op1.project(info.fields).name("Projection"));
- }
-
- private void createRebalanceOperation(OperationInfo info) throws IOException {
- DataSet op = (DataSet) sets.get(info.parentID);
- sets.put(info.setID, op.rebalance().name("Rebalance"));
- }
-
- private void createSortOperation(OperationInfo info) throws IOException {
- Grouping op1 = (Grouping) sets.get(info.parentID);
- if (op1 instanceof UnsortedGrouping) {
- sets.put(info.setID, ((UnsortedGrouping) op1).sortGroup(info.field, info.order));
- return;
- }
- if (op1 instanceof SortedGrouping) {
- sets.put(info.setID, ((SortedGrouping) op1).sortGroup(info.field, info.order));
- }
- }
-
- private void createUnionOperation(OperationInfo info) throws IOException {
- DataSet op1 = (DataSet) sets.get(info.parentID);
- DataSet op2 = (DataSet) sets.get(info.otherID);
- sets.put(info.setID, op1.union(op2).name("Union"));
- }
-
- private void createCoGroupOperation(INFO info) {
- DataSet op1 = (DataSet) sets.get(info.parentID);
- DataSet op2 = (DataSet) sets.get(info.otherID);
- sets.put(info.setID, applyCoGroupOperation(op1, op2, info.keys1, info.keys2, info));
- }
-
- private void createCrossOperation(DatasizeHint mode, INFO info) {
- DataSet op1 = (DataSet) sets.get(info.parentID);
- DataSet op2 = (DataSet) sets.get(info.otherID);
-
- if (info.types != null && (info.projections == null || info.projections.length == 0)) {
- sets.put(info.setID, applyCrossOperation(op1, op2, mode, info));
- } else {
- DefaultCross defaultResult;
- switch (mode) {
- case NONE:
- defaultResult = op1.cross(op2);
- break;
- case HUGE:
- defaultResult = op1.crossWithHuge(op2);
- break;
- case TINY:
- defaultResult = op1.crossWithTiny(op2);
- break;
- default:
- throw new IllegalArgumentException("Invalid Cross mode specified: " + mode);
- }
- if (info.projections.length == 0) {
- sets.put(info.setID, defaultResult.name("DefaultCross"));
- } else {
- ProjectCross project = null;
- for (ProjectionEntry pe : info.projections) {
- switch (pe.side) {
- case FIRST:
- project = project == null ? defaultResult.projectFirst(pe.keys) : project.projectFirst(pe.keys);
- break;
- case SECOND:
- project = project == null ? defaultResult.projectSecond(pe.keys) : project.projectSecond(pe.keys);
- break;
- }
- }
- sets.put(info.setID, project.name("ProjectCross"));
- }
- }
- }
-
- private void createFilterOperation(INFO info) {
- DataSet op1 = (DataSet) sets.get(info.parentID);
- sets.put(info.setID, applyFilterOperation(op1, info));
- }
-
- private void createFlatMapOperation(INFO info) {
- DataSet op1 = (DataSet) sets.get(info.parentID);
- sets.put(info.setID, applyFlatMapOperation(op1, info));
- }
-
- private void createGroupReduceOperation(INFO info) {
- Object op1 = sets.get(info.parentID);
- if (op1 instanceof DataSet) {
- sets.put(info.setID, applyGroupReduceOperation((DataSet) op1, info));
- return;
- }
- if (op1 instanceof UnsortedGrouping) {
- sets.put(info.setID, applyGroupReduceOperation((UnsortedGrouping) op1, info));
- return;
- }
- if (op1 instanceof SortedGrouping) {
- sets.put(info.setID, applyGroupReduceOperation((SortedGrouping) op1, info));
- }
- }
-
- private void createJoinOperation(DatasizeHint mode, INFO info) {
- DataSet op1 = (DataSet) sets.get(info.parentID);
- DataSet op2 = (DataSet) sets.get(info.otherID);
-
- if (info.types != null && (info.projections == null || info.projections.length == 0)) {
- sets.put(info.setID, applyJoinOperation(op1, op2, info.keys1, info.keys2, mode, info));
- } else {
- DefaultJoin defaultResult = createDefaultJoin(op1, op2, info.keys1, info.keys2, mode);
- if (info.projections.length == 0) {
- sets.put(info.setID, defaultResult.name("DefaultJoin"));
- } else {
- ProjectJoin project = null;
- for (ProjectionEntry pe : info.projections) {
- switch (pe.side) {
- case FIRST:
- project = project == null ? defaultResult.projectFirst(pe.keys) : project.projectFirst(pe.keys);
- break;
- case SECOND:
- project = project == null ? defaultResult.projectSecond(pe.keys) : project.projectSecond(pe.keys);
- break;
- }
- }
- sets.put(info.setID, project.name("ProjectJoin"));
- }
- }
- }
-
- protected DefaultJoin createDefaultJoin(DataSet op1, DataSet op2, String[] firstKeys, String[] secondKeys, DatasizeHint mode) {
- switch (mode) {
- case NONE:
- return op1.join(op2).where(firstKeys).equalTo(secondKeys);
- case HUGE:
- return op1.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys);
- case TINY:
- return op1.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys);
- default:
- throw new IllegalArgumentException("Invalid join mode specified.");
- }
- }
-
- private void createMapOperation(INFO info) {
- DataSet op1 = (DataSet) sets.get(info.parentID);
- sets.put(info.setID, applyMapOperation(op1, info));
- }
-
- private void createMapPartitionOperation(INFO info) {
- DataSet op1 = (DataSet) sets.get(info.parentID);
- sets.put(info.setID, applyMapPartitionOperation(op1, info));
- }
-
- private void createReduceOperation(INFO info) {
- Object op1 = sets.get(info.parentID);
- if (op1 instanceof DataSet) {
- sets.put(info.setID, applyReduceOperation((DataSet) op1, info));
- return;
- }
- if (op1 instanceof UnsortedGrouping) {
- sets.put(info.setID, applyReduceOperation((UnsortedGrouping) op1, info));
- }
- }
-
- protected abstract DataSet applyCoGroupOperation(DataSet op1, DataSet op2, String[] firstKeys, String[] secondKeys, INFO info);
-
- protected abstract DataSet applyCrossOperation(DataSet op1, DataSet op2, DatasizeHint mode, INFO info);
-
- protected abstract DataSet applyFilterOperation(DataSet op1, INFO info);
-
- protected abstract DataSet applyFlatMapOperation(DataSet op1, INFO info);
-
- protected abstract DataSet applyGroupReduceOperation(DataSet op1, INFO info);
-
- protected abstract DataSet applyGroupReduceOperation(UnsortedGrouping op1, INFO info);
-
- protected abstract DataSet applyGroupReduceOperation(SortedGrouping op1, INFO info);
-
- protected abstract DataSet applyJoinOperation(DataSet op1, DataSet op2, String[] firstKeys, String[] secondKeys, DatasizeHint mode, INFO info);
-
- protected abstract DataSet applyMapOperation(DataSet op1, INFO info);
-
- protected abstract DataSet applyMapPartitionOperation(DataSet op1, INFO info);
-
- protected abstract DataSet applyReduceOperation(DataSet op1, INFO info);
-
- protected abstract DataSet applyReduceOperation(UnsortedGrouping op1, INFO info);
-
- //====Utility=======================================================================================================
- protected static String[] normalizeKeys(Object keys) {
- if (keys instanceof Tuple) {
- Tuple tupleKeys = (Tuple) keys;
- if (tupleKeys.getArity() == 0) {
- return new String[0];
- }
- if (tupleKeys.getField(0) instanceof Integer) {
- String[] stringKeys = new String[tupleKeys.getArity()];
- for (int x = 0; x < stringKeys.length; x++) {
- stringKeys[x] = "f" + (Integer) tupleKeys.getField(x);
- }
- return stringKeys;
- }
- if (tupleKeys.getField(0) instanceof String) {
- return tupleToStringArray(tupleKeys);
- }
- throw new RuntimeException("Key argument contains field that is neither an int nor a String.");
- }
- if (keys instanceof int[]) {
- int[] intKeys = (int[]) keys;
- String[] stringKeys = new String[intKeys.length];
- for (int x = 0; x < stringKeys.length; x++) {
- stringKeys[x] = "f" + intKeys[x];
- }
- return stringKeys;
- }
- throw new RuntimeException("Key argument is neither an int[] nor a Tuple.");
- }
-
- protected static int[] toIntArray(Object key) {
- if (key instanceof Tuple) {
- Tuple tuple = (Tuple) key;
- int[] keys = new int[tuple.getArity()];
- for (int y = 0; y < tuple.getArity(); y++) {
- keys[y] = (Integer) tuple.getField(y);
- }
- return keys;
- }
- if (key instanceof int[]) {
- return (int[]) key;
- }
- throw new RuntimeException("Key argument is neither an int[] nor a Tuple.");
- }
-
- protected static String[] tupleToStringArray(Tuple tuple) {
- String[] keys = new String[tuple.getArity()];
- for (int y = 0; y < tuple.getArity(); y++) {
- keys[y] = (String) tuple.getField(y);
- }
- return keys;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
deleted file mode 100644
index 59ed20c..0000000
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.common.streaming;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.io.Serializable;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-//CHECKSTYLE.OFF: AvoidStarImport - tuple imports
-import org.apache.flink.api.java.tuple.*;
-import static org.apache.flink.languagebinding.api.java.common.streaming.Sender.*;
-//CHECKSTYLE.ON: AvoidStarImport
-import static org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_TMP_DATA_DIR;
-import static org.apache.flink.languagebinding.api.java.common.PlanBinder.MAPPED_FILE_SIZE;
-import org.apache.flink.util.Collector;
-
-/**
- * General-purpose class to read data from memory-mapped files.
- */
-public class Receiver implements Serializable {
- private static final long serialVersionUID = -2474088929850009968L;
-
- private final AbstractRichFunction function;
-
- private File inputFile;
- private RandomAccessFile inputRAF;
- private FileChannel inputChannel;
- private MappedByteBuffer fileBuffer;
-
- private Deserializer<?> deserializer = null;
-
- public Receiver(AbstractRichFunction function) {
- this.function = function;
- }
-
- //=====Setup========================================================================================================
- public void open(String path) throws IOException {
- setupMappedFile(path);
- }
-
- private void setupMappedFile(String inputFilePath) throws FileNotFoundException, IOException {
- File x = new File(FLINK_TMP_DATA_DIR);
- x.mkdirs();
-
- inputFile = new File(inputFilePath);
- if (inputFile.exists()) {
- inputFile.delete();
- }
- inputFile.createNewFile();
- inputRAF = new RandomAccessFile(inputFilePath, "rw");
- inputRAF.setLength(MAPPED_FILE_SIZE);
- inputRAF.seek(MAPPED_FILE_SIZE - 1);
- inputRAF.writeByte(0);
- inputRAF.seek(0);
- inputChannel = inputRAF.getChannel();
- fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE);
- }
-
- public void close() throws IOException {
- closeMappedFile();
- }
-
- private void closeMappedFile() throws IOException {
- inputChannel.close();
- inputRAF.close();
- }
-
- //=====Record-API===================================================================================================
- /**
- * Loads a buffer from the memory-mapped file. The records contained within the buffer can be accessed using
- * collectRecord(). These records do not necessarily have to be of the same type. This method requires external
- * synchronization.
- *
- * @throws IOException
- */
- private void loadBuffer() throws IOException {
- int count = 0;
- while (fileBuffer.get(0) == 0 && count < 10) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- }
- fileBuffer.load();
- count++;
- }
- if (fileBuffer.get(0) == 0) {
- throw new RuntimeException("External process not responding.");
- }
- fileBuffer.position(1);
- }
-
- /**
- * Returns a record from the buffer. Note: This method cannot be replaced with specific methods like readInt() or
- * similar. The PlanBinder requires a method that can return any kind of object.
- *
- * @return read record
- * @throws IOException
- */
- public Object getRecord() throws IOException {
- return getRecord(false);
- }
-
- /**
- * Returns a record from the buffer. Note: This method cannot be replaced with specific methods like readInt() or
- * similar. The PlanBinder requires a method that can return any kind of object.
- *
- * @param normalized flag indicating whether certain types should be normalized
- * @return read record
- * @throws IOException
- */
- public Object getRecord(boolean normalized) throws IOException {
- if (fileBuffer.position() == 0) {
- loadBuffer();
- }
- return receiveField(normalized);
- }
-
- /**
- * Reads a single primitive value or tuple from the buffer.
- *
- * @return primitive value or tuple
- * @throws IOException
- */
- private Object receiveField(boolean normalized) throws IOException {
- byte type = fileBuffer.get();
- switch (type) {
- case TYPE_TUPLE:
- int tupleSize = fileBuffer.get();
- Tuple tuple = createTuple(tupleSize);
- for (int x = 0; x < tupleSize; x++) {
- tuple.setField(receiveField(normalized), x);
- }
- return tuple;
- case TYPE_BOOLEAN:
- return fileBuffer.get() == 1;
- case TYPE_BYTE:
- return fileBuffer.get();
- case TYPE_SHORT:
- if (normalized) {
- return (int) fileBuffer.getShort();
- } else {
- return fileBuffer.getShort();
- }
- case TYPE_INTEGER:
- return fileBuffer.getInt();
- case TYPE_LONG:
- if (normalized) {
- return new Long(fileBuffer.getLong()).intValue();
- } else {
- return fileBuffer.getLong();
- }
- case TYPE_FLOAT:
- if (normalized) {
- return (double) fileBuffer.getFloat();
- } else {
- return fileBuffer.getFloat();
- }
- case TYPE_DOUBLE:
- return fileBuffer.getDouble();
- case TYPE_STRING:
- int stringSize = fileBuffer.getInt();
- byte[] buffer = new byte[stringSize];
- fileBuffer.get(buffer);
- return new String(buffer);
- case TYPE_BYTES:
- int bytessize = fileBuffer.getInt();
- byte[] bytebuffer = new byte[bytessize];
- fileBuffer.get(bytebuffer);
- return bytebuffer;
- case TYPE_NULL:
- return null;
- default:
- throw new IllegalArgumentException("Unknown TypeID encountered: " + type);
- }
- }
-
- //=====Buffered-API=================================================================================================
- /**
- * Reads a buffer of the given size from the memory-mapped file, and collects all records contained. This method
- * assumes that all values in the buffer are of the same type. This method does NOT take care of synchronization.
- * The user must guarantee that the buffer was completely written before calling this method.
- *
- * @param c Collector to collect records
- * @param bufferSize size of the buffer
- * @throws IOException
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public void collectBuffer(Collector c, int bufferSize) throws IOException {
- fileBuffer.position(0);
-
- if (deserializer == null) {
- byte type = fileBuffer.get();
- deserializer = getDeserializer(type);
- }
- while (fileBuffer.position() < bufferSize) {
- c.collect(deserializer.deserialize());
- }
- }
-
- //=====Deserializer=================================================================================================
- private Deserializer<?> getDeserializer(byte type) {
- switch (type) {
- case TYPE_TUPLE:
- return new TupleDeserializer();
- case TYPE_BOOLEAN:
- return new BooleanDeserializer();
- case TYPE_BYTE:
- return new ByteDeserializer();
- case TYPE_BYTES:
- return new BytesDeserializer();
- case TYPE_SHORT:
- return new ShortDeserializer();
- case TYPE_INTEGER:
- return new IntDeserializer();
- case TYPE_LONG:
- return new LongDeserializer();
- case TYPE_STRING:
- return new StringDeserializer();
- case TYPE_FLOAT:
- return new FloatDeserializer();
- case TYPE_DOUBLE:
- return new DoubleDeserializer();
- case TYPE_NULL:
- return new NullDeserializer();
- default:
- throw new IllegalArgumentException("Unknown TypeID encountered: " + type);
-
- }
- }
-
- private interface Deserializer<T> {
- public T deserialize();
-
- }
-
- private class BooleanDeserializer implements Deserializer<Boolean> {
- @Override
- public Boolean deserialize() {
- return fileBuffer.get() == 1;
- }
- }
-
- private class ByteDeserializer implements Deserializer<Byte> {
- @Override
- public Byte deserialize() {
- return fileBuffer.get();
- }
- }
-
- private class ShortDeserializer implements Deserializer<Short> {
- @Override
- public Short deserialize() {
- return fileBuffer.getShort();
- }
- }
-
- private class IntDeserializer implements Deserializer<Integer> {
- @Override
- public Integer deserialize() {
- return fileBuffer.getInt();
- }
- }
-
- private class LongDeserializer implements Deserializer<Long> {
- @Override
- public Long deserialize() {
- return fileBuffer.getLong();
- }
- }
-
- private class FloatDeserializer implements Deserializer<Float> {
- @Override
- public Float deserialize() {
- return fileBuffer.getFloat();
- }
- }
-
- private class DoubleDeserializer implements Deserializer<Double> {
- @Override
- public Double deserialize() {
- return fileBuffer.getDouble();
- }
- }
-
- private class StringDeserializer implements Deserializer<String> {
- private int size;
-
- @Override
- public String deserialize() {
- size = fileBuffer.getInt();
- byte[] buffer = new byte[size];
- fileBuffer.get(buffer);
- return new String(buffer);
- }
- }
-
- private class NullDeserializer implements Deserializer<Object> {
- @Override
- public Object deserialize() {
- return null;
- }
- }
-
- private class BytesDeserializer implements Deserializer<byte[]> {
- @Override
- public byte[] deserialize() {
- int length = fileBuffer.getInt();
- byte[] result = new byte[length];
- fileBuffer.get(result);
- return result;
- }
-
- }
-
- private class TupleDeserializer implements Deserializer<Tuple> {
- Deserializer<?>[] deserializer = null;
- Tuple reuse;
-
- public TupleDeserializer() {
- int size = fileBuffer.getInt();
- reuse = createTuple(size);
- deserializer = new Deserializer[size];
- for (int x = 0; x < deserializer.length; x++) {
- deserializer[x] = getDeserializer(fileBuffer.get());
- }
- }
-
- @Override
- public Tuple deserialize() {
- for (int x = 0; x < deserializer.length; x++) {
- reuse.setField(deserializer[x].deserialize(), x);
- }
- return reuse;
- }
- }
-
- public static Tuple createTuple(int size) {
- try {
- return Tuple.getTupleClass(size).newInstance();
- } catch (InstantiationException e) {
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
deleted file mode 100644
index 3e0c317..0000000
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.common.streaming;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import static org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_TMP_DATA_DIR;
-import static org.apache.flink.languagebinding.api.java.common.PlanBinder.MAPPED_FILE_SIZE;
-
-/**
- * General-purpose class to write data to memory-mapped files.
- */
-public class Sender implements Serializable {
- public static final byte TYPE_TUPLE = (byte) 11;
- public static final byte TYPE_BOOLEAN = (byte) 10;
- public static final byte TYPE_BYTE = (byte) 9;
- public static final byte TYPE_SHORT = (byte) 8;
- public static final byte TYPE_INTEGER = (byte) 7;
- public static final byte TYPE_LONG = (byte) 6;
- public static final byte TYPE_DOUBLE = (byte) 4;
- public static final byte TYPE_FLOAT = (byte) 5;
- public static final byte TYPE_CHAR = (byte) 3;
- public static final byte TYPE_STRING = (byte) 2;
- public static final byte TYPE_BYTES = (byte) 1;
- public static final byte TYPE_NULL = (byte) 0;
-
- private final AbstractRichFunction function;
-
- private File outputFile;
- private RandomAccessFile outputRAF;
- private FileChannel outputChannel;
- private MappedByteBuffer fileBuffer;
-
- private final ByteBuffer[] saved = new ByteBuffer[2];
-
- private final Serializer[] serializer = new Serializer[2];
-
- public Sender(AbstractRichFunction function) {
- this.function = function;
- }
-
- //=====Setup========================================================================================================
- public void open(String path) throws IOException {
- setupMappedFile(path);
- }
-
- private void setupMappedFile(String outputFilePath) throws FileNotFoundException, IOException {
- File x = new File(FLINK_TMP_DATA_DIR);
- x.mkdirs();
-
- outputFile = new File(outputFilePath);
- if (outputFile.exists()) {
- outputFile.delete();
- }
- outputFile.createNewFile();
- outputRAF = new RandomAccessFile(outputFilePath, "rw");
- outputRAF.setLength(MAPPED_FILE_SIZE);
- outputRAF.seek(MAPPED_FILE_SIZE - 1);
- outputRAF.writeByte(0);
- outputRAF.seek(0);
- outputChannel = outputRAF.getChannel();
- fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE);
- }
-
- public void close() throws IOException {
- closeMappedFile();
- }
-
- private void closeMappedFile() throws IOException {
- outputChannel.close();
- outputRAF.close();
- }
-
- /**
- * Resets this object to the post-configuration state.
- */
- public void reset() {
- serializer[0] = null;
- serializer[1] = null;
- fileBuffer.clear();
- }
-
- //=====Serialization================================================================================================
- /**
- * Writes a single record to the memory-mapped file. This method does NOT take care of synchronization. The user
- * must guarantee that the file may be written to before calling this method. This method essentially reserves the
- * whole buffer for one record. As such it imposes some performance restrictions and should only be used when
- * absolutely necessary.
- *
- * @param value record to send
- * @return size of the written buffer
- * @throws IOException
- */
- public int sendRecord(Object value) throws IOException {
- fileBuffer.clear();
- int group = 0;
-
- serializer[group] = getSerializer(value);
- ByteBuffer bb = serializer[group].serialize(value);
- if (bb.remaining() > MAPPED_FILE_SIZE) {
- throw new RuntimeException("Serialized object does not fit into a single buffer.");
- }
- fileBuffer.put(bb);
-
- int size = fileBuffer.position();
-
- reset();
- return size;
- }
-
- public boolean hasRemaining(int group) {
- return saved[group] != null;
- }
-
- /**
- * Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values
- * in the iterator are of the same type. This method does NOT take care of synchronization. The caller must
- * guarantee that the file may be written to before calling this method.
- *
- * @param i iterator containing records
- * @param group group to which the iterator belongs, most notably used by CoGroup-functions.
- * @return size of the written buffer
- * @throws IOException
- */
- public int sendBuffer(Iterator i, int group) throws IOException {
- fileBuffer.clear();
-
- Object value;
- ByteBuffer bb;
- if (serializer[group] == null) {
- value = i.next();
- serializer[group] = getSerializer(value);
- bb = serializer[group].serialize(value);
- if (bb.remaining() > MAPPED_FILE_SIZE) {
- throw new RuntimeException("Serialized object does not fit into a single buffer.");
- }
- fileBuffer.put(bb);
-
- }
- if (saved[group] != null) {
- fileBuffer.put(saved[group]);
- saved[group] = null;
- }
- while (i.hasNext() && saved[group] == null) {
- value = i.next();
- bb = serializer[group].serialize(value);
- if (bb.remaining() > MAPPED_FILE_SIZE) {
- throw new RuntimeException("Serialized object does not fit into a single buffer.");
- }
- if (bb.remaining() <= fileBuffer.remaining()) {
- fileBuffer.put(bb);
- } else {
- saved[group] = bb;
- }
- }
-
- int size = fileBuffer.position();
- return size;
- }
-
- private enum SupportedTypes {
- TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, FLOAT, DOUBLE, STRING, OTHER, NULL
- }
-
- //=====Serializer===================================================================================================
- private Serializer getSerializer(Object value) throws IOException {
- String className = value.getClass().getSimpleName().toUpperCase();
- if (className.startsWith("TUPLE")) {
- className = "TUPLE";
- }
- if (className.startsWith("BYTE[]")) {
- className = "BYTES";
- }
- SupportedTypes type = SupportedTypes.valueOf(className);
- switch (type) {
- case TUPLE:
- fileBuffer.put(TYPE_TUPLE);
- fileBuffer.putInt(((Tuple) value).getArity());
- return new TupleSerializer((Tuple) value);
- case BOOLEAN:
- fileBuffer.put(TYPE_BOOLEAN);
- return new BooleanSerializer();
- case BYTE:
- fileBuffer.put(TYPE_BYTE);
- return new ByteSerializer();
- case BYTES:
- fileBuffer.put(TYPE_BYTES);
- return new BytesSerializer();
- case CHARACTER:
- fileBuffer.put(TYPE_CHAR);
- return new CharSerializer();
- case SHORT:
- fileBuffer.put(TYPE_SHORT);
- return new ShortSerializer();
- case INTEGER:
- fileBuffer.put(TYPE_INTEGER);
- return new IntSerializer();
- case LONG:
- fileBuffer.put(TYPE_LONG);
- return new LongSerializer();
- case STRING:
- fileBuffer.put(TYPE_STRING);
- return new StringSerializer();
- case FLOAT:
- fileBuffer.put(TYPE_FLOAT);
- return new FloatSerializer();
- case DOUBLE:
- fileBuffer.put(TYPE_DOUBLE);
- return new DoubleSerializer();
- case NULL:
- fileBuffer.put(TYPE_NULL);
- return new NullSerializer();
- default:
- throw new IllegalArgumentException("Unknown Type encountered: " + type);
- }
- }
-
- private abstract class Serializer<T> {
- protected ByteBuffer buffer;
-
- public Serializer(int capacity) {
- buffer = ByteBuffer.allocate(capacity);
- }
-
- public ByteBuffer serialize(T value) {
- buffer.clear();
- serializeInternal(value);
- buffer.flip();
- return buffer;
- }
-
- public abstract void serializeInternal(T value);
- }
-
- private class ByteSerializer extends Serializer<Byte> {
- public ByteSerializer() {
- super(1);
- }
-
- @Override
- public void serializeInternal(Byte value) {
- buffer.put(value);
- }
- }
-
- private class BooleanSerializer extends Serializer<Boolean> {
- public BooleanSerializer() {
- super(1);
- }
-
- @Override
- public void serializeInternal(Boolean value) {
- buffer.put(value ? (byte) 1 : (byte) 0);
- }
- }
-
- private class CharSerializer extends Serializer<Character> {
- public CharSerializer() {
- super(4);
- }
-
- @Override
- public void serializeInternal(Character value) {
- buffer.put((value + "").getBytes());
- }
- }
-
- private class ShortSerializer extends Serializer<Short> {
- public ShortSerializer() {
- super(2);
- }
-
- @Override
- public void serializeInternal(Short value) {
- buffer.putShort(value);
- }
- }
-
- private class IntSerializer extends Serializer<Integer> {
- public IntSerializer() {
- super(4);
- }
-
- @Override
- public void serializeInternal(Integer value) {
- buffer.putInt(value);
- }
- }
-
- private class LongSerializer extends Serializer<Long> {
- public LongSerializer() {
- super(8);
- }
-
- @Override
- public void serializeInternal(Long value) {
- buffer.putLong(value);
- }
- }
-
- private class StringSerializer extends Serializer<String> {
- public StringSerializer() {
- super(0);
- }
-
- @Override
- public void serializeInternal(String value) {
- byte[] bytes = value.getBytes();
- buffer = ByteBuffer.allocate(bytes.length + 4);
- buffer.putInt(bytes.length);
- buffer.put(bytes);
- }
- }
-
- private class FloatSerializer extends Serializer<Float> {
- public FloatSerializer() {
- super(4);
- }
-
- @Override
- public void serializeInternal(Float value) {
- buffer.putFloat(value);
- }
- }
-
- private class DoubleSerializer extends Serializer<Double> {
- public DoubleSerializer() {
- super(8);
- }
-
- @Override
- public void serializeInternal(Double value) {
- buffer.putDouble(value);
- }
- }
-
- private class NullSerializer extends Serializer<Object> {
- public NullSerializer() {
- super(0);
- }
-
- @Override
- public void serializeInternal(Object value) {
- }
- }
-
- private class BytesSerializer extends Serializer<byte[]> {
- public BytesSerializer() {
- super(0);
- }
-
- @Override
- public void serializeInternal(byte[] value) {
- buffer = ByteBuffer.allocate(4 + value.length);
- buffer.putInt(value.length);
- buffer.put(value);
- }
- }
-
- private class TupleSerializer extends Serializer<Tuple> {
- private final Serializer[] serializer;
- private final List<ByteBuffer> buffers;
-
- public TupleSerializer(Tuple value) throws IOException {
- super(0);
- serializer = new Serializer[value.getArity()];
- buffers = new ArrayList();
- for (int x = 0; x < serializer.length; x++) {
- serializer[x] = getSerializer(value.getField(x));
- }
- }
-
- @Override
- public void serializeInternal(Tuple value) {
- int length = 0;
- for (int x = 0; x < serializer.length; x++) {
- serializer[x].buffer.clear();
- serializer[x].serializeInternal(value.getField(x));
- length += serializer[x].buffer.position();
- buffers.add(serializer[x].buffer);
- }
- buffer = ByteBuffer.allocate(length);
- for (ByteBuffer b : buffers) {
- b.flip();
- buffer.put(b);
- }
- buffers.clear();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java
deleted file mode 100644
index 1ad0606..0000000
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.common.streaming;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-/**
- * Simple utility class to print all contents of an inputstream to stdout.
- */
-public class StreamPrinter extends Thread {
- private final BufferedReader reader;
- private final boolean wrapInException;
- private StringBuilder msg;
-
- public StreamPrinter(InputStream stream) {
- this(stream, false, null);
- }
-
- public StreamPrinter(InputStream stream, boolean wrapInException, StringBuilder msg) {
- this.reader = new BufferedReader(new InputStreamReader(stream));
- this.wrapInException = wrapInException;
- this.msg = msg;
- }
-
- @Override
- public void run() {
- String line;
- try {
- if (wrapInException) {
- while ((line = reader.readLine()) != null) {
- msg.append("\n" + line);
- }
- } else {
- while ((line = reader.readLine()) != null) {
- System.out.println(line);
- System.out.flush();
- }
- }
- } catch (IOException ex) {
- }
- }
-}