You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/06/01 14:10:14 UTC
flink git commit: [FLINK-3993] [py] Add Environment.generateSequence()
Repository: flink
Updated Branches:
refs/heads/master bce068c49 -> b201f8664
[FLINK-3993] [py] Add Environment.generateSequence()
This closes #2055
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b201f866
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b201f866
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b201f866
Branch: refs/heads/master
Commit: b201f8664c9814b6e0a0f5149338effff49d4c88
Parents: bce068c
Author: omaralvarez <om...@udc.es>
Authored: Tue May 31 13:23:38 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Jun 1 16:09:43 2016 +0200
----------------------------------------------------------------------
docs/apis/batch/python.md | 4 ++++
.../flink/python/api/PythonOperationInfo.java | 6 ++++--
.../apache/flink/python/api/PythonPlanBinder.java | 2 +-
.../flink/python/api/flink/plan/Constants.py | 1 +
.../flink/python/api/flink/plan/Environment.py | 18 ++++++++++++++++++
.../flink/python/api/flink/plan/OperationInfo.py | 2 ++
.../org/apache/flink/python/api/test_main.py | 4 ++++
7 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/docs/apis/batch/python.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/python.md b/docs/apis/batch/python.md
index 8771cb5..0f55124 100644
--- a/docs/apis/batch/python.md
+++ b/docs/apis/batch/python.md
@@ -458,6 +458,7 @@ File-based:
Collection-based:
- `from_elements(*args)` - Creates a data set from a Seq. All elements
+- `generate_sequence(from, to)` - Generates the sequence of numbers in the given interval, in parallel.
**Examples**
@@ -475,6 +476,9 @@ csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))
\# create a set from some given elements
values = env.from_elements("Foo", "bar", "foobar", "fubar")
+
+\# generate a number sequence
+numbers = env.generate_sequence(1, 10000000)
{% endhighlight %}
{% top %}
http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
index 7f7a993..89aad22 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -39,7 +39,7 @@ public class PythonOperationInfo {
public String path;
public String fieldDelimiter;
public String lineDelimiter;
- public long from;
+ public long frm;
public long to;
public WriteMode writeMode;
public boolean toError;
@@ -83,6 +83,8 @@ public class PythonOperationInfo {
? WriteMode.OVERWRITE
: WriteMode.NO_OVERWRITE;
path = (String) streamer.getRecord();
+ frm = (Long) streamer.getRecord();
+ to = (Long) streamer.getRecord();
setID = (Integer) streamer.getRecord(true);
toError = (Boolean) streamer.getRecord();
count = (Integer) streamer.getRecord(true);
@@ -121,7 +123,7 @@ public class PythonOperationInfo {
sb.append("Path: ").append(path).append("\n");
sb.append("FieldDelimiter: ").append(fieldDelimiter).append("\n");
sb.append("LineDelimiter: ").append(lineDelimiter).append("\n");
- sb.append("From: ").append(from).append("\n");
+ sb.append("From: ").append(frm).append("\n");
sb.append("To: ").append(to).append("\n");
sb.append("WriteMode: ").append(writeMode).append("\n");
sb.append("toError: ").append(toError).append("\n");
http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index f91237f..f43a4f9 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -410,7 +410,7 @@ public class PythonPlanBinder {
}
private void createSequenceSource(PythonOperationInfo info) throws IOException {
- sets.put(info.setID, env.generateSequence(info.from, info.to).setParallelism(getParallelism(info)).name("SequenceSource")
+ sets.put(info.setID, env.generateSequence(info.frm, info.to).setParallelism(getParallelism(info)).name("SequenceSource")
.map(new SerializerMap<Long>()).setParallelism(getParallelism(info)).name("SequenceSourcePostStep"));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
index be9fc6d..a1dffaf 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
@@ -41,6 +41,7 @@ class _Identifier(object):
SOURCE_CSV = "source_csv"
SOURCE_TEXT = "source_text"
SOURCE_VALUE = "source_value"
+ SOURCE_SEQ = "source_seq"
SINK_CSV = "sink_csv"
SINK_TEXT = "sink_text"
SINK_PRINT = "sink_print"
http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index d0f28dc..3dbce45 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -118,6 +118,22 @@ class Environment(object):
self._sources.append(child)
return child_set
+ def generate_sequence(self, frm, to):
+ """
+ Creates a new data set that contains the given sequence
+
+ :param frm: The start number for the sequence.
+ :param to: The end number for the sequence.
+ :return: A DataSet representing the given sequence of numbers.
+ """
+ child = OperationInfo()
+ child_set = DataSet(self, child)
+ child.identifier = _Identifier.SOURCE_SEQ
+ child.frm = frm
+ child.to = to
+ self._sources.append(child)
+ return child_set
+
def set_parallelism(self, parallelism):
"""
Sets the parallelism for operations executed through this environment.
@@ -270,6 +286,8 @@ class Environment(object):
collect(set.delimiter_field)
collect(set.write_mode)
collect(set.path)
+ collect(set.frm)
+ collect(set.to)
collect(set.id)
collect(set.to_err)
collect(set.count)
http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
index 5d83e33..fcda712 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
@@ -42,6 +42,8 @@ class OperationInfo():
self.delimiter_field = ","
self.write_mode = WriteMode.NO_OVERWRITE
self.path = ""
+ self.frm = 0
+ self.to = 0
self.count = 0
self.values = []
self.projections = []
http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
index 3e718d3..9b0f144 100644
--- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
@@ -78,6 +78,10 @@ if __name__ == "__main__":
d6 = env.from_elements(1, 1, 12)
+ #Generate Sequence Source
+ d7 = env.generate_sequence(1, 5)\
+ .map(Id()).map_partition(Verify([1,2,3,4,5], "Sequence")).output()
+
#CSV Source/Sink
csv_data = env.read_csv("src/test/python/org/apache/flink/python/api/data_csv", (INT, INT, STRING))