You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/06/01 14:10:14 UTC

flink git commit: [FLINK-3993] [py] Add Environment.generateSequence()

Repository: flink
Updated Branches:
  refs/heads/master bce068c49 -> b201f8664


[FLINK-3993] [py] Add Environment.generateSequence()

This closes #2055


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b201f866
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b201f866
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b201f866

Branch: refs/heads/master
Commit: b201f8664c9814b6e0a0f5149338effff49d4c88
Parents: bce068c
Author: omaralvarez <om...@udc.es>
Authored: Tue May 31 13:23:38 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Jun 1 16:09:43 2016 +0200

----------------------------------------------------------------------
 docs/apis/batch/python.md                         |  4 ++++
 .../flink/python/api/PythonOperationInfo.java     |  6 ++++--
 .../apache/flink/python/api/PythonPlanBinder.java |  2 +-
 .../flink/python/api/flink/plan/Constants.py      |  1 +
 .../flink/python/api/flink/plan/Environment.py    | 18 ++++++++++++++++++
 .../flink/python/api/flink/plan/OperationInfo.py  |  2 ++
 .../org/apache/flink/python/api/test_main.py      |  4 ++++
 7 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/docs/apis/batch/python.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/python.md b/docs/apis/batch/python.md
index 8771cb5..0f55124 100644
--- a/docs/apis/batch/python.md
+++ b/docs/apis/batch/python.md
@@ -458,6 +458,7 @@ File-based:
 Collection-based:
 
 - `from_elements(*args)` - Creates a data set from a Seq. All elements
+- `generate_sequence(from, to)` - Generates the sequence of numbers in the given interval, in parallel. 
 
 **Examples**
 
@@ -475,6 +476,9 @@ csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))
 
 \# create a set from some given elements
 values = env.from_elements("Foo", "bar", "foobar", "fubar")
+
+\# generate a number sequence
+numbers = env.generate_sequence(1, 10000000)
 {% endhighlight %}
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
index 7f7a993..89aad22 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -39,7 +39,7 @@ public class PythonOperationInfo {
 	public String path;
 	public String fieldDelimiter;
 	public String lineDelimiter;
-	public long from;
+	public long frm;
 	public long to;
 	public WriteMode writeMode;
 	public boolean toError;
@@ -83,6 +83,8 @@ public class PythonOperationInfo {
 			? WriteMode.OVERWRITE
 			: WriteMode.NO_OVERWRITE;
 		path = (String) streamer.getRecord();
+		frm = (Long) streamer.getRecord();
+		to = (Long) streamer.getRecord();
 		setID = (Integer) streamer.getRecord(true);
 		toError = (Boolean) streamer.getRecord();
 		count = (Integer) streamer.getRecord(true);
@@ -121,7 +123,7 @@ public class PythonOperationInfo {
 		sb.append("Path: ").append(path).append("\n");
 		sb.append("FieldDelimiter: ").append(fieldDelimiter).append("\n");
 		sb.append("LineDelimiter: ").append(lineDelimiter).append("\n");
-		sb.append("From: ").append(from).append("\n");
+		sb.append("From: ").append(frm).append("\n");
 		sb.append("To: ").append(to).append("\n");
 		sb.append("WriteMode: ").append(writeMode).append("\n");
 		sb.append("toError: ").append(toError).append("\n");

http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index f91237f..f43a4f9 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -410,7 +410,7 @@ public class PythonPlanBinder {
 	}
 
 	private void createSequenceSource(PythonOperationInfo info) throws IOException {
-		sets.put(info.setID, env.generateSequence(info.from, info.to).setParallelism(getParallelism(info)).name("SequenceSource")
+		sets.put(info.setID, env.generateSequence(info.frm, info.to).setParallelism(getParallelism(info)).name("SequenceSource")
 				.map(new SerializerMap<Long>()).setParallelism(getParallelism(info)).name("SequenceSourcePostStep"));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
index be9fc6d..a1dffaf 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
@@ -41,6 +41,7 @@ class _Identifier(object):
     SOURCE_CSV = "source_csv"
     SOURCE_TEXT = "source_text"
     SOURCE_VALUE = "source_value"
+    SOURCE_SEQ = "source_seq"
     SINK_CSV = "sink_csv"
     SINK_TEXT = "sink_text"
     SINK_PRINT = "sink_print"

http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index d0f28dc..3dbce45 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -118,6 +118,22 @@ class Environment(object):
         self._sources.append(child)
         return child_set
 
+    def generate_sequence(self, frm, to):
+        """
+        Creates a new data set that contains the given sequence
+
+        :param frm: The start number for the sequence.
+        :param to: The end number for the sequence.
+        :return: A DataSet representing the given sequence of numbers.
+        """
+        child = OperationInfo()
+        child_set = DataSet(self, child)
+        child.identifier = _Identifier.SOURCE_SEQ
+        child.frm = frm
+        child.to = to
+        self._sources.append(child)
+        return child_set
+
     def set_parallelism(self, parallelism):
         """
         Sets the parallelism for operations executed through this environment.
@@ -270,6 +286,8 @@ class Environment(object):
         collect(set.delimiter_field)
         collect(set.write_mode)
         collect(set.path)
+        collect(set.frm)
+        collect(set.to)
         collect(set.id)
         collect(set.to_err)
         collect(set.count)

http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
index 5d83e33..fcda712 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
@@ -42,6 +42,8 @@ class OperationInfo():
             self.delimiter_field = ","
             self.write_mode = WriteMode.NO_OVERWRITE
             self.path = ""
+            self.frm = 0
+            self.to = 0
             self.count = 0
             self.values = []
             self.projections = []

http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
index 3e718d3..9b0f144 100644
--- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
@@ -78,6 +78,10 @@ if __name__ == "__main__":
 
     d6 = env.from_elements(1, 1, 12)
 
+    #Generate Sequence Source
+    d7 = env.generate_sequence(1, 5)\
+         .map(Id()).map_partition(Verify([1,2,3,4,5], "Sequence")).output()
+
     #CSV Source/Sink
     csv_data = env.read_csv("src/test/python/org/apache/flink/python/api/data_csv", (INT, INT, STRING))