You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/11/13 16:06:14 UTC
[1/5] incubator-flink git commit: [streaming] Removed obsolete Join
example
Repository: incubator-flink
Updated Branches:
refs/heads/master 818ebda0f -> d332d6c31
[streaming] Removed obsolete Join example
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/537d6f6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/537d6f6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/537d6f6d
Branch: refs/heads/master
Commit: 537d6f6da529d534f9c523c0b208861b381e3905
Parents: 818ebda
Author: Marton Balassi <mb...@apache.org>
Authored: Mon Nov 10 14:15:05 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Nov 13 15:24:03 2014 +0100
----------------------------------------------------------------------
.../streaming/examples/join/GradeSource.java | 45 -----------
.../streaming/examples/join/JoinLocal.java | 53 ------------
.../flink/streaming/examples/join/JoinTask.java | 84 --------------------
.../streaming/examples/join/SalarySource.java | 45 -----------
4 files changed, 227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/537d6f6d/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/GradeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/GradeSource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/GradeSource.java
deleted file mode 100644
index 93bcdab..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/GradeSource.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
-
- private static final long serialVersionUID = -5897483980082089771L;
-
- private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
- "andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
- private Random rand = new Random();
- private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
-
- @Override
- public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
- // Continuously emit tuples with random names and integers (grades).
- while (true) {
- outTuple.f0 = names[rand.nextInt(names.length)];
- outTuple.f1 = rand.nextInt(5) + 1;
-
- out.collect(outTuple);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/537d6f6d/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
deleted file mode 100644
index 717ad8e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-public class JoinLocal {
-
- private static final int PARALLELISM = 1;
- private static final int SOURCE_PARALLELISM = 1;
-
- // This example will join two streams. One which emits people's grades and
- // one which emits people's salaries.
-
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
- PARALLELISM).setBufferTimeout(100);
-
- DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource(),
- SOURCE_PARALLELISM);
-
- DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource(),
- SOURCE_PARALLELISM);
-
- DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades.connect(salaries)
- .flatMap(new JoinTask());
-
- System.out.println("(NAME, GRADE, SALARY)");
- joinedStream.print();
-
- env.execute();
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/537d6f6d/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
deleted file mode 100644
index 35e18f4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.function.co.RichCoFlatMapFunction;
-import org.apache.flink.util.Collector;
-
-//Joins the input value with the already known values. If it is a grade
-// then with the salaries, if it is a salary then with the grades. Also
-// stores the new element.
-public class JoinTask extends
- RichCoFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {
- private static final long serialVersionUID = 1L;
-
- private HashMap<String, ArrayList<Integer>> gradeHashmap;
- private HashMap<String, ArrayList<Integer>> salaryHashmap;
- private String name;
-
- public JoinTask() {
- gradeHashmap = new HashMap<String, ArrayList<Integer>>();
- salaryHashmap = new HashMap<String, ArrayList<Integer>>();
- name = new String();
- }
-
- Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>();
-
- // GRADES
- @Override
- public void flatMap1(Tuple2<String, Integer> value,
- Collector<Tuple3<String, Integer, Integer>> out) {
- name = value.f0;
- outputTuple.f0 = name;
- outputTuple.f1 = value.f1;
- if (salaryHashmap.containsKey(name)) {
- for (Integer salary : salaryHashmap.get(name)) {
- outputTuple.f2 = salary;
- out.collect(outputTuple);
- }
- }
- if (!gradeHashmap.containsKey(name)) {
- gradeHashmap.put(name, new ArrayList<Integer>());
- }
- gradeHashmap.get(name).add(value.f1);
- }
-
- // SALARIES
- @Override
- public void flatMap2(Tuple2<String, Integer> value,
- Collector<Tuple3<String, Integer, Integer>> out) {
- name = value.f0;
- outputTuple.f0 = name;
- outputTuple.f2 = value.f1;
- if (gradeHashmap.containsKey(name)) {
- for (Integer grade : gradeHashmap.get(name)) {
- outputTuple.f1 = grade;
- out.collect(outputTuple);
- }
- }
- if (!salaryHashmap.containsKey(name)) {
- salaryHashmap.put(name, new ArrayList<Integer>());
- }
- salaryHashmap.get(name).add(value.f1);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/537d6f6d/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/SalarySource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/SalarySource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/SalarySource.java
deleted file mode 100644
index 988935f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/SalarySource.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class SalarySource implements SourceFunction<Tuple2<String, Integer>> {
-
- private static final long serialVersionUID = 1L;
-
- private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
- "andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
- private Random rand = new Random();
- private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
-
- @Override
- public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
- // Continuously emit tuples with random names and integers (salaries).
- while (true) {
-
- outTuple.f0 = names[rand.nextInt(names.length)];
- outTuple.f1 = rand.nextInt(10000);
- out.collect(outTuple);
- }
- }
-}
[3/5] incubator-flink git commit: [streaming] Example cleanup +
windowJoin fix
Posted by mb...@apache.org.
[streaming] Example cleanup + windowJoin fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e7ce9567
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e7ce9567
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e7ce9567
Branch: refs/heads/master
Commit: e7ce9567c57ffc59e5a7588445b6b891bc58e5f8
Parents: 6867f9b
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Nov 12 12:32:32 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Nov 13 15:24:04 2014 +0100
----------------------------------------------------------------------
.../api/datastream/ConnectedDataStream.java | 6 +
.../examples/basictopology/BasicTopology.java | 98 ++++++++-------
.../examples/cellinfo/CellInfoLocal.java | 123 ------------------
.../examples/cellinfo/IWorkerEngine.java | 23 ----
.../flink/streaming/examples/cellinfo/Util.java | 28 -----
.../examples/cellinfo/WorkerEngineBin.java | 97 --------------
.../examples/cellinfo/WorkerEngineExact.java | 76 -----------
.../examples/iteration/IterateExample.java | 80 ++++++++----
.../ml/IncrementalLearningSkeleton.java | 15 ++-
.../examples/twitter/TwitterStream.java | 5 +-
.../examples/window/join/GradeSource.java | 14 +--
.../examples/window/join/SalarySource.java | 14 +--
.../examples/window/join/WindowJoinLocal.java | 25 ++--
.../examples/window/join/WindowJoinTask.java | 126 -------------------
14 files changed, 148 insertions(+), 582 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 23c420c..6556550 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -580,6 +580,9 @@ public class ConnectedDataStream<IN1, IN2> {
SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2, int fieldIn1, int fieldIn2) {
+ dataStream1 = dataStream1.groupBy(fieldIn1);
+ dataStream2 = dataStream2.groupBy(fieldIn2);
+
JoinWindowFunction<IN1, IN2> joinWindowFunction = new JoinWindowFunction<IN1, IN2>(
dataStream1.getOutputType(), dataStream2.getOutputType(), fieldIn1, fieldIn2);
@@ -590,6 +593,9 @@ public class ConnectedDataStream<IN1, IN2> {
SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2, String fieldIn1, String fieldIn2) {
+ dataStream1 = dataStream1.groupBy(fieldIn1);
+ dataStream2 = dataStream2.groupBy(fieldIn2);
+
JoinWindowFunction<IN1, IN2> joinWindowFunction = new JoinWindowFunction<IN1, IN2>(
dataStream1.getOutputType(), dataStream2.getOutputType(), fieldIn1, fieldIn2);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
index 2b1ac4f..9379ff2 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
@@ -1,49 +1,61 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.basictopology;
-
+ */
+
+package org.apache.flink.streaming.examples.basictopology;
+
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-public class BasicTopology {
-
- public static class IdentityMap implements MapFunction<String, String> {
- private static final long serialVersionUID = 1L;
- // map to the same value
- @Override
- public String map(String value) throws Exception {
- return value;
- }
-
- }
-
- private static final int PARALLELISM = 1;
-
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(PARALLELISM);
-
- DataStream<String> stream = env.fromElements(WordCountData.WORDS).map(new IdentityMap());
-
- stream.print();
-
- env.execute();
- }
-}
+
+/**
+ *
+ * Very basic Flink streaming topology for local testing.
+ *
+ */
+public class BasicTopology {
+
+ private static final int PARALLELISM = 1;
+
+ public static void main(String[] args) throws Exception {
+
+ // We create a new Local environment. The program will be executed using
+ // a new mini-cluster that is set up at execution.
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironment(PARALLELISM);
+
+ // We create a new data stream from a collection of words then apply a
+ // simple map.
+ DataStream<String> stream = env.fromElements(WordCountData.WORDS).map(new IdentityMap());
+
+ // Print the results
+ stream.print();
+
+ env.execute();
+ }
+
+ public static class IdentityMap implements MapFunction<String, String> {
+ private static final long serialVersionUID = 1L;
+
+ // map to the same value
+ @Override
+ public String map(String value) throws Exception {
+ return value;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
deleted file mode 100644
index 1dcb5e6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class CellInfoLocal {
-
- private final static int CELL_COUNT = 10;
- private final static int LAST_MILLIS = 1000;
- private final static int PARALLELISM = 1;
- private final static int SOURCE_PARALLELISM = 1;
- private final static int QUERY_SLEEP_TIME = 1000;
- private final static int QUERY_COUNT = 10;
- private final static int INFO_SLEEP_TIME = 100;
- private final static int INFO_COUNT = 100;
-
- public final static class InfoSource implements SourceFunction<Tuple3<Integer, Long, Integer>> {
- private static final long serialVersionUID = 1L;
-
- private static Random rand = new Random();
- private Tuple3<Integer, Long, Integer> tuple = new Tuple3<Integer, Long, Integer>(0, 0L, 0);
-
- @Override
- public void invoke(Collector<Tuple3<Integer, Long, Integer>> out) throws Exception {
- for (int i = 0; i < INFO_COUNT; i++) {
- Thread.sleep(INFO_SLEEP_TIME);
- tuple.f0 = rand.nextInt(CELL_COUNT);
- tuple.f1 = System.currentTimeMillis();
-
- out.collect(tuple);
- }
- }
- }
-
- private final static class QuerySource implements
- SourceFunction<Tuple3<Integer, Long, Integer>> {
- private static final long serialVersionUID = 1L;
-
- private static Random rand = new Random();
- Tuple3<Integer, Long, Integer> tuple = new Tuple3<Integer, Long, Integer>(0, 0L,
- LAST_MILLIS);
-
- @Override
- public void invoke(Collector<Tuple3<Integer, Long, Integer>> collector) throws Exception {
- for (int i = 0; i < QUERY_COUNT; i++) {
- Thread.sleep(QUERY_SLEEP_TIME);
- tuple.f0 = rand.nextInt(CELL_COUNT);
- tuple.f1 = System.currentTimeMillis();
- collector.collect(tuple);
- }
- }
- }
-
- private final static class CellTask extends
- RichCoMapFunction<Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>, String> {
- private static final long serialVersionUID = 1L;
-
- private WorkerEngineExact engine;
- private Integer cellID;
- private Long timeStamp;
- private Integer lastMillis;
-
- public CellTask() {
- engine = new WorkerEngineExact(CELL_COUNT, LAST_MILLIS, System.currentTimeMillis());
- }
-
- // INFO
- @Override
- public String map1(Tuple3<Integer, Long, Integer> value) {
- cellID = value.f0;
- timeStamp = value.f1;
- engine.put(cellID, timeStamp);
- return "INFO:\t" + cellID + " @ " + timeStamp;
- }
-
- // QUERY
- @Override
- public String map2(Tuple3<Integer, Long, Integer> value) {
- cellID = value.f0;
- timeStamp = value.f1;
- lastMillis = value.f2;
- return "QUERY:\t" + cellID + ": " + engine.get(timeStamp, lastMillis, cellID);
- }
- }
-
- // Example for connecting data streams
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
- PARALLELISM).setBufferTimeout(100);
-
- DataStream<Tuple3<Integer, Long, Integer>> querySource = env.addSource(new QuerySource(),
- SOURCE_PARALLELISM).partitionBy(0);
-
- DataStream<String> stream = env.addSource(new InfoSource(), SOURCE_PARALLELISM)
- .partitionBy(0).connect(querySource).map(new CellTask());
- stream.print();
-
- env.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/IWorkerEngine.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/IWorkerEngine.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/IWorkerEngine.java
deleted file mode 100644
index b2b2264..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/IWorkerEngine.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-public interface IWorkerEngine {
- public int get(long timeStamp, long lastMillis, int cellId);
- public void put(int cellId, long timeStamp);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/Util.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/Util.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/Util.java
deleted file mode 100644
index 7463c6c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/Util.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-public class Util {
- public static int mod(int x, int y) {
- int result = x % y;
- if (result < 0) {
- result += y;
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineBin.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineBin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineBin.java
deleted file mode 100644
index b6097f5..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineBin.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-import org.apache.flink.streaming.examples.cellinfo.Util;
-
-public class WorkerEngineBin implements java.io.Serializable, IWorkerEngine {
-
- private static final long serialVersionUID = 1L;
- private long unitLength_;
- private long lastTimeUpdated_;
- private int[][] counters_;
- private int pointer_;
-
- public WorkerEngineBin(long unitLength, int numOfCells, int bufferInterval, long currentTime) {
- lastTimeUpdated_ = currentTime / unitLength * unitLength;
- unitLength_ = unitLength;
- counters_ = new int[(int) (bufferInterval / unitLength) + 1][numOfCells];
- }
-
- private int getCell(int interval, int cell) {
- return counters_[Util.mod((interval + pointer_), counters_.length)][cell];
- }
-
- private void incrCell(int interval, int cell) {
- ++counters_[Util.mod((interval + pointer_), counters_.length)][cell];
- }
-
- private void toZero(int interval) {
- for (int cell = 0; cell < counters_[0].length; ++cell) {
- counters_[Util.mod((interval + pointer_), counters_.length)][cell] = 0;
- }
- }
-
- private void shift(int shiftBy) {
- if (shiftBy > 0 && shiftBy < counters_.length) {
- pointer_ = Util.mod((pointer_ - shiftBy), counters_.length);
- for (int i = 0; i < shiftBy; ++i) {
- toZero(i);
- }
- } else if (shiftBy >= counters_.length) {
- pointer_ = 0;
- for (int i = 0; i < counters_.length; ++i) {
- toZero(i);
- }
- }
- }
-
- public int get(long timeStamp, long lastMillis, int cellId) {
- int shift = refresh(timeStamp);
- int numOfLastIntervals = (int) (lastMillis / unitLength_);
- if (shift >= counters_.length || numOfLastIntervals >= counters_.length) {
- return -1;
- }
- int sum = 0;
- for (int i = shift + 1; i < shift + numOfLastIntervals + 1; ++i) {
- sum += getCell(i, cellId);
- }
- return sum;
- }
-
- private int refresh(long timeStamp) {
- int shiftBy = (int) ((timeStamp - lastTimeUpdated_) / unitLength_);
- shift(shiftBy);
- int retVal;
- if (shiftBy > 0) {
- lastTimeUpdated_ = timeStamp / unitLength_ * unitLength_;
- retVal = 0;
- } else {
- retVal = -shiftBy;
- }
- return retVal;
- }
-
- public void put(int cellId, long timeStamp) {
- int shift = refresh(timeStamp);
- if (shift >= counters_.length) {
- return;
- }
- incrCell(shift, cellId);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineExact.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineExact.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineExact.java
deleted file mode 100644
index 8a9bcda..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineExact.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class WorkerEngineExact implements java.io.Serializable, IWorkerEngine {
- private static final long serialVersionUID = 1L;
- private long lastTimeUpdated_;
- private long bufferInterval_;
- private TreeMap<Long, Integer>[] counters_;
-
- @SuppressWarnings("unchecked")
- public WorkerEngineExact(int numOfCells, int bufferInterval, long currentTime) {
- lastTimeUpdated_ = currentTime;
- bufferInterval_ = bufferInterval;
- counters_ = new TreeMap[numOfCells];
- for (int i = 0; i < numOfCells; ++i) {
- counters_[i] = new TreeMap<Long, Integer>();
- }
- }
-
- public int get(long timeStamp, long lastMillis, int cellId) {
- refresh(timeStamp);
- Map<Long, Integer> subMap = counters_[cellId].subMap(timeStamp - lastMillis, true, timeStamp, false);
- int retVal = 0;
- for (Map.Entry<Long, Integer> entry : subMap.entrySet()) {
- retVal += entry.getValue();
- }
- return retVal;
- }
-
- public void put(int cellId, long timeStamp) {
- refresh(timeStamp);
- TreeMap<Long, Integer> map = counters_[cellId];
- if (map.containsKey(timeStamp)) {
- map.put(timeStamp, map.get(timeStamp) + 1);
- } else {
- map.put(timeStamp, 1);
- }
- }
-
- public void refresh(long timeStamp) {
- if (timeStamp - lastTimeUpdated_ > bufferInterval_) {
- for (int i = 0; i < counters_.length; ++i) {
- for (Iterator<Map.Entry<Long, Integer>> it = counters_[i].entrySet().iterator(); it.hasNext();) {
- Map.Entry<Long, Integer> entry = it.next();
- long time = entry.getKey();
- if (timeStamp - time > bufferInterval_) {
- it.remove();
- } else {
- break;
- }
- }
- }
- lastTimeUpdated_ = timeStamp;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index e2094fb..a011837 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -29,13 +29,63 @@ import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+/**
+ * Example illustrating iterations in Flink streaming programs. The program will
+ * sum up random numbers and counts how many additions it needs to reach a
+ * specific threshold in an iterative streaming fashion.
+ *
+ */
public class IterateExample {
+ public static void main(String[] args) throws Exception {
+
+ // Set up our input for the stream of (0,0)s
+ List<Tuple2<Double, Integer>> input = new ArrayList<Tuple2<Double, Integer>>();
+ for (int i = 0; i < 1000; i++) {
+ input.add(new Tuple2<Double, Integer>(0., 0));
+ }
+
+ // Obtain execution environment and use setBufferTimeout(0) to enable
+ // continuous flushing of the output buffers (lowest latency).
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
+ .setBufferTimeout(0);
+
+ // Create an iterative datastream from the input.
+ IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).shuffle()
+ .iterate();
+
+ // We make sure that the iteration terminates if no new data received in
+ // the stream for 5 seconds
+ it.setMaxWaitTime(5000);
+
+ // We apply the stepfunction to add a new random value to the tuple and
+ // increment the counter, then we split the output with our
+ // outputselector.
+ SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle()
+ .split(new MySelector());
+
+ // We close the iteration be selecting the tuples that was directed to
+ // 'iterate' in the outputselector
+ it.closeWith(step.select("iterate"));
+
+ // To produce the final output we select he tuples directed to 'output'
+ // than project it to the second field
+ step.select("output").project(1).types(Integer.class).print();
+
+ // Execute the streaming program
+ env.execute();
+ }
+
+ /**
+ * Iteration step function which takes an input (Double , Integer) and
+ * produces an output (Double+random, Integer +1)
+ *
+ */
public static class Step implements
MapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> {
private static final long serialVersionUID = 1L;
-
+
Random rnd;
public Step() {
@@ -50,13 +100,16 @@ public class IterateExample {
}
+ /**
+ * s OutputSelector test which tuple needed to be iterated again.
+ */
public static class MySelector extends OutputSelector<Tuple2<Double, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void select(Tuple2<Double, Integer> value, Collection<String> outputs) {
- if (value.f0 > 100) {
+ if (value.f0 > 200) {
outputs.add("output");
} else {
outputs.add("iterate");
@@ -64,27 +117,4 @@ public class IterateExample {
}
}
-
-
- public static void main(String[] args) throws Exception {
-
- List<Tuple2<Double, Integer>> input = new ArrayList<Tuple2<Double, Integer>>();
- for (int i = 0; i < 100; i++) {
- input.add(new Tuple2<Double, Integer>(0., 0));
- }
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
- .setBufferTimeout(1);
-
- IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).iterate()
- .setMaxWaitTime(3000);
-
- SplitDataStream<Tuple2<Double,Integer>> step = it.map(new Step()).shuffle().split(new MySelector());
-
- it.closeWith(step.select("iterate"));
-
- step.select("output").project(1).types(Integer.class).print();
-
- env.execute();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 9fe1af4..d2f9e6e 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -39,7 +39,7 @@ public class IncrementalLearningSkeleton {
// Method for pulling new data for prediction
private Integer getNewData() throws InterruptedException {
- Thread.sleep(1000);
+ Thread.sleep(100);
return 1;
}
}
@@ -59,7 +59,7 @@ public class IncrementalLearningSkeleton {
// Method for pulling new training data
private Integer getTrainingData() throws InterruptedException {
- Thread.sleep(1000);
+ Thread.sleep(100);
return 1;
}
@@ -114,13 +114,13 @@ public class IncrementalLearningSkeleton {
}
- private static final int PARALLELISM = 1;
- private static final int SOURCE_PARALLELISM = 1;
+ private static int SOURCE_PARALLELISM = 1;
public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
- PARALLELISM).setBufferTimeout(1000);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ IncrementalLearningSkeleton.SOURCE_PARALLELISM = env.getDegreeOfParallelism();
// Build new model on every second of new data
DataStream<Double[]> model = env.addSource(new TrainingDataSource(), SOURCE_PARALLELISM)
@@ -130,11 +130,10 @@ public class IncrementalLearningSkeleton {
DataStream<Integer> prediction = env.addSource(new NewDataSource(), SOURCE_PARALLELISM)
.connect(model).map(new Predictor());
+ // We pring the output
prediction.print();
env.execute();
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
index 5d8022a..ef1d84b 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -32,7 +32,7 @@ import org.apache.sling.commons.json.JSONException;
/**
* Implements the "TwitterStream" program that computes a most used word occurrence
- * histogram over JSON files in a streaming fashion.
+ * over JSON files in a streaming fashion.
*
* <p>
* The input is a JSON text file with lines separated by newline characters.
@@ -52,7 +52,6 @@ import org.apache.sling.commons.json.JSONException;
*
*/
public class TwitterStream {
- private static final int PARALLELISM = 1;
// *************************************************************************
// PROGRAM
@@ -65,7 +64,7 @@ public class TwitterStream {
// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(PARALLELISM);
+ .getExecutionEnvironment();
env.setBufferTimeout(1000);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
index c895c5b..d877f91 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
@@ -19,29 +19,27 @@ package org.apache.flink.streaming.examples.window.join;
import java.util.Random;
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
-public class GradeSource implements SourceFunction<Tuple3<String, Integer, Long>> {
+public class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = -5897483980082089771L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
- "andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
+ "andrew", "jean", "richard", "smith", "gorge", "black", "peter" ,"mark", "eric"};
private Random rand = new Random();
- private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
- private Long progress = 0L;
+ private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
@Override
- public void invoke(Collector<Tuple3<String, Integer, Long>> out) throws Exception {
+ public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
// Continuously emit tuples with random names and integers (grades).
while (true) {
outTuple.f0 = names[rand.nextInt(names.length)];
outTuple.f1 = rand.nextInt(5) + 1;
- outTuple.f2 = progress;
out.collect(outTuple);
- progress += 1;
+ Thread.sleep(rand.nextInt(10) + 1);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
index 641a738..0da48e5 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
@@ -19,29 +19,27 @@ package org.apache.flink.streaming.examples.window.join;
import java.util.Random;
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
-public class SalarySource implements SourceFunction<Tuple3<String, Integer, Long>> {
+public class SalarySource implements SourceFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 6670933703432267728L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
- "andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
+ "andrew", "jean", "richard", "smith", "gorge", "black", "peter", "mark", "eric" };
private Random rand = new Random();
- private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
- private Long progress = 0L;
+ private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
@Override
- public void invoke(Collector<Tuple3<String, Integer, Long>> out) throws Exception {
+ public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
// Continuously emit tuples with random names and integers (salaries).
while (true) {
outTuple.f0 = names[rand.nextInt(names.length)];
outTuple.f1 = rand.nextInt(10000);
- outTuple.f2 = progress;
out.collect(outTuple);
- progress += 1;
+ Thread.sleep(rand.nextInt(10) + 1);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
index 211daf6..695cf5d 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
@@ -17,33 +17,30 @@
package org.apache.flink.streaming.examples.window.join;
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class WindowJoinLocal {
- private static final int PARALLELISM = 4;
- private static final int SOURCE_PARALLELISM = 2;
-
// This example will join two streams with a sliding window. One which emits
// people's grades and one which emits people's salaries.
public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
- PARALLELISM).setBufferTimeout(100);
-
- DataStream<Tuple3<String, Integer, Long>> grades = env.addSource(new GradeSource(),
- SOURCE_PARALLELISM);
+ // Obtain execution environment
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Tuple3<String, Integer, Long>> salaries = env.addSource(new SalarySource(),
- SOURCE_PARALLELISM);
+ // Connect to the data sources for grades and salaries
+ DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
+ DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
- DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades.connect(salaries)
- .flatMap(new WindowJoinTask());
+ // Apply a temporal join over the two stream based on the names in one
+ // second windows
+ DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
+ .windowJoin(salaries, 1000, 1000, 0, 0);
- System.out.println("(NAME, GRADE, SALARY)");
+ // Print the results
joinedStream.print();
env.execute();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
deleted file mode 100644
index 524ffbd..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.function.co.RichCoFlatMapFunction;
-import org.apache.flink.util.Collector;
-
-public class WindowJoinTask extends
- RichCoFlatMapFunction<Tuple3<String, Integer, Long>, Tuple3<String, Integer, Long>, Tuple3<String, Integer, Integer>> {
-
- class SalaryProgress {
- public SalaryProgress(Integer salary, Long progress) {
- this.salary = salary;
- this.progress = progress;
- }
-
- Integer salary;
- Long progress;
- }
-
- class GradeProgress {
- public GradeProgress(Integer grade, Long progress) {
- this.grade = grade;
- this.progress = progress;
- }
-
- Integer grade;
- Long progress;
- }
-
- private static final long serialVersionUID = 749913336259789039L;
- private int windowSize = 100;
- private HashMap<String, LinkedList<GradeProgress>> gradeHashmap;
- private HashMap<String, LinkedList<SalaryProgress>> salaryHashmap;
- private String name;
- private Long progress;
-
- public WindowJoinTask() {
- gradeHashmap = new HashMap<String, LinkedList<GradeProgress>>();
- salaryHashmap = new HashMap<String, LinkedList<SalaryProgress>>();
- name = new String();
- progress = 0L;
- }
-
- Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>();
-
- // Joins the input value (grade) with the already known values (salaries) on
- // a given interval.
- // Also stores the new element.
- @Override
- public void flatMap1(Tuple3<String, Integer, Long> value,
- Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
- name = value.f0;
- progress = value.f2;
-
- outputTuple.f0 = name;
- outputTuple.f1 = value.f1;
-
- if (salaryHashmap.containsKey(name)) {
- Iterator<SalaryProgress> iterator = salaryHashmap.get(name).iterator();
- while (iterator.hasNext()) {
- SalaryProgress entry = iterator.next();
- if (progress - entry.progress > windowSize) {
- iterator.remove();
- } else {
- outputTuple.f2 = entry.salary;
- out.collect(outputTuple);
- }
- }
- }
- if (!gradeHashmap.containsKey(name)) {
- gradeHashmap.put(name, new LinkedList<GradeProgress>());
- }
- gradeHashmap.get(name).add(new GradeProgress(value.f1, progress));
- }
-
- // Joins the input value (salary) with the already known values (grades) on
- // a given interval.
- // Also stores the new element.
- @Override
- public void flatMap2(Tuple3<String, Integer, Long> value,
- Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
- name = value.f0;
- progress = value.f2;
-
- outputTuple.f0 = name;
- outputTuple.f2 = value.f1;
-
- if (gradeHashmap.containsKey(name)) {
- Iterator<GradeProgress> iterator = gradeHashmap.get(name).iterator();
- while (iterator.hasNext()) {
- GradeProgress entry = iterator.next();
- if (progress - entry.progress > windowSize) {
- iterator.remove();
- } else {
- outputTuple.f1 = entry.grade;
- out.collect(outputTuple);
- }
- }
- }
- if (!salaryHashmap.containsKey(name)) {
- salaryHashmap.put(name, new LinkedList<SalaryProgress>());
- }
- salaryHashmap.get(name).add(new SalaryProgress(value.f1, progress));
- }
-}
[2/5] incubator-flink git commit: [streaming]
StreamExecutionEnvironment rework + user class loader fix for cluster
deployment
Posted by mb...@apache.org.
[streaming] StreamExecutionEnvironment rework + user class loader fix for cluster deployment
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6867f9b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6867f9b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6867f9b9
Branch: refs/heads/master
Commit: 6867f9b93ec1ad9a627450c4fbd0b5ff98ef6148
Parents: c6dd9b1
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Nov 12 01:11:36 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Nov 13 15:24:04 2014 +0100
----------------------------------------------------------------------
.../flink/streaming/api/JobGraphBuilder.java | 18 -----
.../flink/streaming/api/StreamConfig.java | 73 +++++++++++--------
.../api/environment/LocalStreamEnvironment.java | 17 ++---
.../environment/RemoteStreamEnvironment.java | 40 ++++++-----
.../environment/StreamContextEnvironment.java | 75 ++++++++++++++++++++
.../environment/StreamExecutionEnvironment.java | 69 +++++-------------
.../api/streamvertex/CoStreamVertex.java | 9 ++-
.../api/streamvertex/InputHandler.java | 7 +-
.../api/streamvertex/OutputHandler.java | 8 ++-
.../api/streamvertex/StreamVertex.java | 9 ++-
.../flink/streaming/util/ClusterUtil.java | 40 +----------
.../api/streamvertex/StreamVertexTest.java | 5 --
.../examples/iteration/IterateExample.java | 4 +-
.../client/program/ContextEnvironment.java | 4 ++
.../flink/client/program/JobWithJars.java | 2 +-
15 files changed, 193 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 5a8fd22..df59be1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -80,8 +80,6 @@ public class JobGraphBuilder {
private Map<String, Long> iterationWaitTime;
private Map<String, Map<String, OperatorState<?>>> operatorStates;
- private int degreeOfParallelism;
- private int executionParallelism;
/**
* Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
@@ -120,22 +118,6 @@ public class JobGraphBuilder {
}
}
- public int getDefaultParallelism() {
- return degreeOfParallelism;
- }
-
- public void setDefaultParallelism(int defaultParallelism) {
- this.degreeOfParallelism = defaultParallelism;
- }
-
- public int getExecutionParallelism() {
- return executionParallelism;
- }
-
- public void setExecutionParallelism(int executionParallelism) {
- this.executionParallelism = executionParallelism;
- }
-
/**
* Adds a vertex to the streaming JobGraph with the given parameters
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 7da6265..3dba376 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.state.OperatorState;
import org.apache.flink.streaming.util.serialization.TypeWrapper;
+import org.apache.flink.util.InstantiationUtil;
public class StreamConfig {
private static final String INPUT_TYPE = "inputType_";
@@ -98,20 +99,20 @@ public class StreamConfig {
setTypeWrapper(TYPE_WRAPPER_OUT_2, typeWrapper);
}
- public <T> TypeInformation<T> getTypeInfoIn1() {
- return getTypeInfo(TYPE_WRAPPER_IN_1);
+ public <T> TypeInformation<T> getTypeInfoIn1(ClassLoader cl) {
+ return getTypeInfo(TYPE_WRAPPER_IN_1, cl);
}
- public <T> TypeInformation<T> getTypeInfoIn2() {
- return getTypeInfo(TYPE_WRAPPER_IN_2);
+ public <T> TypeInformation<T> getTypeInfoIn2(ClassLoader cl) {
+ return getTypeInfo(TYPE_WRAPPER_IN_2, cl);
}
- public <T> TypeInformation<T> getTypeInfoOut1() {
- return getTypeInfo(TYPE_WRAPPER_OUT_1);
+ public <T> TypeInformation<T> getTypeInfoOut1(ClassLoader cl) {
+ return getTypeInfo(TYPE_WRAPPER_OUT_1, cl);
}
- public <T> TypeInformation<T> getTypeInfoOut2() {
- return getTypeInfo(TYPE_WRAPPER_OUT_2);
+ public <T> TypeInformation<T> getTypeInfoOut2(ClassLoader cl) {
+ return getTypeInfo(TYPE_WRAPPER_OUT_2, cl);
}
private void setTypeWrapper(String key, TypeWrapper<?> typeWrapper) {
@@ -119,18 +120,17 @@ public class StreamConfig {
}
@SuppressWarnings("unchecked")
- private <T> TypeInformation<T> getTypeInfo(String key) {
- byte[] serializedWrapper = config.getBytes(key, null);
+ private <T> TypeInformation<T> getTypeInfo(String key, ClassLoader cl) {
- if (serializedWrapper == null) {
- throw new RuntimeException("TypeSerializationWrapper must be set");
+ TypeWrapper<T> typeWrapper;
+ try {
+ typeWrapper = (TypeWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config, key,
+ cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot load typeinfo");
}
-
- TypeWrapper<T> typeWrapper = (TypeWrapper<T>) SerializationUtils
- .deserialize(serializedWrapper);
if (typeWrapper != null) {
return typeWrapper.getTypeInfo();
-
} else {
return null;
}
@@ -166,9 +166,10 @@ public class StreamConfig {
}
}
- public <T> T getUserInvokable() {
+ @SuppressWarnings({ "unchecked" })
+ public <T> T getUserInvokable(ClassLoader cl) {
try {
- return SerializationUtils.deserialize(config.getBytes(SERIALIZEDUDF, null));
+ return (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
} catch (Exception e) {
throw new StreamVertexException("Cannot instantiate user function", e);
}
@@ -189,10 +190,10 @@ public class StreamConfig {
}
}
- public Object getFunction() {
+ public Object getFunction(ClassLoader cl) {
try {
- return SerializationUtils.deserialize(config.getBytes(FUNCTION, null));
- } catch (SerializationException e) {
+ return InstantiationUtil.readObjectFromConfig(this.config, FUNCTION, cl);
+ } catch (Exception e) {
throw new RuntimeException("Cannot deserialize invokable object", e);
}
}
@@ -216,9 +217,11 @@ public class StreamConfig {
}
}
- public <T> OutputSelector<T> getOutputSelector() {
+ @SuppressWarnings("unchecked")
+ public <T> OutputSelector<T> getOutputSelector(ClassLoader cl) {
try {
- return SerializationUtils.deserialize(config.getBytes(OUTPUT_SELECTOR, null));
+ return (OutputSelector<T>) InstantiationUtil.readObjectFromConfig(this.config,
+ OUTPUT_SELECTOR, cl);
} catch (Exception e) {
throw new StreamVertexException("Cannot deserialize and instantiate OutputSelector", e);
}
@@ -254,10 +257,16 @@ public class StreamConfig {
SerializationUtils.serialize(partitionerObject));
}
- public <T> StreamPartitioner<T> getPartitioner(int outputIndex) throws ClassNotFoundException,
- IOException {
- return SerializationUtils.deserialize(config.getBytes(PARTITIONER_OBJECT + outputIndex,
- SerializationUtils.serialize(new ShufflePartitioner<T>())));
+ public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, int outputIndex)
+ throws ClassNotFoundException, IOException {
+ @SuppressWarnings("unchecked")
+ StreamPartitioner<T> partitioner = (StreamPartitioner<T>) InstantiationUtil
+ .readObjectFromConfig(this.config, PARTITIONER_OBJECT + outputIndex, cl);
+ if (partitioner != null) {
+ return partitioner;
+ } else {
+ return new ShufflePartitioner<T>();
+ }
}
public void setSelectAll(int outputIndex, Boolean selectAll) {
@@ -323,8 +332,14 @@ public class StreamConfig {
config.setBytes(OPERATOR_STATES, SerializationUtils.serialize((Serializable) states));
}
- public Map<String, OperatorState<?>> getOperatorStates() {
- return SerializationUtils.deserialize(config.getBytes(OPERATOR_STATES, null));
+ @SuppressWarnings("unchecked")
+ public Map<String, OperatorState<?>> getOperatorStates(ClassLoader cl) {
+ try {
+ return (Map<String, OperatorState<?>>) InstantiationUtil.readObjectFromConfig(
+ this.config, OPERATOR_STATES, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not load operator state");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 5c0f555..505f0e7 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -21,13 +21,15 @@ import org.apache.flink.streaming.util.ClusterUtil;
public class LocalStreamEnvironment extends StreamExecutionEnvironment {
+ protected static ClassLoader userClassLoader;
+
/**
* Executes the JobGraph of the on a mini cluster of CLusterUtil with a
* default name.
*/
@Override
public void execute() throws Exception {
- ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism());
+ ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getDegreeOfParallelism());
}
/**
@@ -39,19 +41,12 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
*/
@Override
public void execute(String jobName) throws Exception {
- if (localExecutionIsAllowed()) {
- ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName),
- getExecutionParallelism());
- } else {
- ClusterUtil.runOnLocalCluster(this.jobGraphBuilder.getJobGraph(jobName),
- getExecutionParallelism());
- }
-
+ ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName),
+ getDegreeOfParallelism());
}
public void executeTest(long memorySize) throws Exception {
- ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism(),
+ ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getDegreeOfParallelism(),
memorySize);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 864e18d..d833c8e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -20,9 +20,8 @@ package org.apache.flink.streaming.api.environment;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
@@ -30,13 +29,15 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
private String host;
private int port;
- private String[] jarFiles;
+ private List<File> jarFiles;
/**
* Creates a new RemoteStreamEnvironment that points to the master
@@ -65,19 +66,28 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
this.host = host;
this.port = port;
- this.jarFiles = jarFiles;
+ this.jarFiles = new ArrayList<File>();
+ for (int i = 0; i < jarFiles.length; i++) {
+ File file = new File(jarFiles[i]);
+ try {
+ JobWithJars.checkJarFile(file);
+ } catch (IOException e) {
+ throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
+ }
+ this.jarFiles.add(file);
+ }
}
@Override
public void execute() {
-
+
JobGraph jobGraph = jobGraphBuilder.getJobGraph();
executeRemotely(jobGraph);
}
-
+
@Override
public void execute(String jobName) {
-
+
JobGraph jobGraph = jobGraphBuilder.getJobGraph(jobName);
executeRemotely(jobGraph);
}
@@ -85,25 +95,21 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
/**
* Executes the remote job.
*
- * @param jobGraph jobGraph to execute
+ * @param jobGraph
+ * jobGraph to execute
*/
private void executeRemotely(JobGraph jobGraph) {
if (LOG.isInfoEnabled()) {
LOG.info("Running remotely at {}:{}", host, port);
}
- for (int i = 0; i < jarFiles.length; i++) {
- File file = new File(jarFiles[i]);
- try {
- JobWithJars.checkJarFile(file);
- } catch (IOException e) {
- throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
- }
+ for (File file : jarFiles) {
jobGraph.addJar(new Path(file.getAbsolutePath()));
}
Configuration configuration = jobGraph.getJobConfiguration();
- Client client = new Client(new InetSocketAddress(host, port), configuration, getClass().getClassLoader());
+ Client client = new Client(new InetSocketAddress(host, port), configuration,
+ JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()));
try {
client.run(jobGraph, true);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
new file mode 100644
index 0000000..c157435
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.flink.client.program.Client;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+public class StreamContextEnvironment extends StreamExecutionEnvironment {
+
+ protected static ClassLoader userClassLoader;
+ protected List<File> jars;
+ protected Client client;
+
+ protected StreamContextEnvironment(Client client, List<File> jars, int dop) {
+ this.client = client;
+ this.jars = jars;
+ if (dop > 0) {
+ setDegreeOfParallelism(dop);
+ } else {
+ setDegreeOfParallelism(GlobalConfiguration.getInteger(
+ ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
+ ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE));
+ }
+ }
+
+ @Override
+ public void execute() throws Exception {
+ execute(null);
+ }
+
+ @Override
+ public void execute(String jobName) throws Exception {
+
+ JobGraph jobGraph;
+ if (jobName == null) {
+ jobGraph = this.jobGraphBuilder.getJobGraph();
+ } else {
+ jobGraph = this.jobGraphBuilder.getJobGraph(jobName);
+ }
+
+ for (File file : jars) {
+ jobGraph.addJar(new Path(file.getAbsolutePath()));
+ }
+
+ try {
+ client.run(jobGraph, true);
+
+ } catch (Exception e) {
+ throw e;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 0d00db3..600a87a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -20,10 +20,13 @@ package org.apache.flink.streaming.api.environment;
import java.io.File;
import java.io.Serializable;
import java.util.Collection;
+import java.util.List;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -44,21 +47,10 @@ import org.apache.flink.streaming.util.serialization.TypeWrapper;
*/
public abstract class StreamExecutionEnvironment {
- /**
- * The environment of the context (local by default, cluster if invoked
- * through command line)
- */
- private static StreamExecutionEnvironment contextEnvironment;
-
- /** flag to disable local executor when using the ContextEnvironment */
- private static boolean allowLocalExecution = true;
-
private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
private int degreeOfParallelism = 1;
- private int executionParallelism = -1;
-
private long bufferTimeout = 100;
protected JobGraphBuilder jobGraphBuilder;
@@ -74,10 +66,6 @@ public abstract class StreamExecutionEnvironment {
jobGraphBuilder = new JobGraphBuilder();
}
- public int getExecutionParallelism() {
- return executionParallelism == -1 ? degreeOfParallelism : executionParallelism;
- }
-
/**
* Gets the degree of parallelism with which operation are executed by
* default. Operations can individually override this value to use a
@@ -143,21 +131,6 @@ public abstract class StreamExecutionEnvironment {
return this.bufferTimeout;
}
- /**
- * Sets the number of hardware contexts (CPU cores / threads) used when
- * executed in {@link LocalStreamEnvironment}.
- *
- * @param degreeOfParallelism
- * The degree of parallelism in local environment
- */
- public void setExecutionParallelism(int degreeOfParallelism) {
- if (degreeOfParallelism < 1) {
- throw new IllegalArgumentException("Degree of parallelism must be at least one.");
- }
-
- this.executionParallelism = degreeOfParallelism;
- }
-
// --------------------------------------------------------------------------------------------
// Data stream creations
// --------------------------------------------------------------------------------------------
@@ -351,8 +324,19 @@ public abstract class StreamExecutionEnvironment {
* executed.
*/
public static StreamExecutionEnvironment getExecutionEnvironment() {
- allowLocalExecution = ExecutionEnvironment.localExecutionIsAllowed();
- return createLocalEnvironment();
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ if (env instanceof ContextEnvironment) {
+ ContextEnvironment ctx = (ContextEnvironment) env;
+ return createContextEnvironment(ctx.getClient(), ctx.getJars(),
+ ctx.getDegreeOfParallelism());
+ } else {
+ return createLocalEnvironment();
+ }
+ }
+
+ private static StreamExecutionEnvironment createContextEnvironment(Client client,
+ List<File> jars, int dop) {
+ return new StreamContextEnvironment(client, jars, dop);
}
/**
@@ -440,27 +424,6 @@ public abstract class StreamExecutionEnvironment {
return rec;
}
- // --------------------------------------------------------------------------------------------
- // Methods to control the context and local environments for execution from
- // packaged programs
- // --------------------------------------------------------------------------------------------
-
- protected static void initializeContextEnvironment(StreamExecutionEnvironment ctx) {
- contextEnvironment = ctx;
- }
-
- protected static boolean isContextEnvironmentSet() {
- return contextEnvironment != null;
- }
-
- protected static void disableLocalExecution() {
- allowLocalExecution = false;
- }
-
- public static boolean localExecutionIsAllowed() {
- return allowLocalExecution;
- }
-
/**
* Triggers the program execution. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index 5a6519d..2464ff2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -29,8 +29,7 @@ import org.apache.flink.streaming.io.CoReaderIterator;
import org.apache.flink.streaming.io.CoRecordReader;
import org.apache.flink.util.MutableObjectIterator;
-public class CoStreamVertex<IN1, IN2, OUT> extends
- StreamVertex<IN1,OUT> {
+public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
private OutputHandler<OUT> outputHandler;
@@ -53,10 +52,10 @@ public class CoStreamVertex<IN1, IN2, OUT> extends
}
private void setDeserializers() {
- TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1();
+ TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1(userClassLoader);
inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);
- TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2();
+ TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2(userClassLoader);
inputDeserializer2 = new StreamRecordSerializer<IN2>(inputTypeInfo2);
}
@@ -72,7 +71,7 @@ public class CoStreamVertex<IN1, IN2, OUT> extends
@Override
protected void setInvokable() {
- userInvokable = configuration.getUserInvokable();
+ userInvokable = configuration.getUserInvokable(userClassLoader);
userInvokable.initialize(outputHandler.getCollector(), coIter, inputDeserializer1,
inputDeserializer2, isMutable);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index 17d2ae5..9d65a21 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -35,10 +35,10 @@ public class InputHandler<IN> {
private MutableObjectIterator<StreamRecord<IN>> inputIter;
private MutableReader<IOReadableWritable> inputs;
- private StreamVertex<IN,?> streamVertex;
+ private StreamVertex<IN, ?> streamVertex;
private StreamConfig configuration;
- public InputHandler(StreamVertex<IN,?> streamComponent) {
+ public InputHandler(StreamVertex<IN, ?> streamComponent) {
this.streamVertex = streamComponent;
this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
try {
@@ -75,7 +75,8 @@ public class InputHandler<IN> {
}
private void setDeserializer() {
- TypeInformation<IN> inTupleTypeInfo = configuration.getTypeInfoIn1();
+ TypeInformation<IN> inTupleTypeInfo = configuration
+ .getTypeInfoIn1(streamVertex.userClassLoader);
if (inTupleTypeInfo != null) {
inputSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 8b72195..d8eb146 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -81,7 +81,8 @@ public class OutputHandler<OUT> {
private StreamCollector<OUT> setCollector() {
if (streamVertex.configuration.getDirectedEmit()) {
- OutputSelector<OUT> outputSelector = streamVertex.configuration.getOutputSelector();
+ OutputSelector<OUT> outputSelector = streamVertex.configuration
+ .getOutputSelector(streamVertex.userClassLoader);
collector = new DirectedStreamCollector<OUT>(streamVertex.getInstanceID(),
outSerializationDelegate, outputSelector);
@@ -97,7 +98,7 @@ public class OutputHandler<OUT> {
}
void setSerializers() {
- outTypeInfo = configuration.getTypeInfoOut1();
+ outTypeInfo = configuration.getTypeInfoOut1(streamVertex.userClassLoader);
if (outTypeInfo != null) {
outSerializer = new StreamRecordSerializer<OUT>(outTypeInfo);
outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
@@ -110,7 +111,8 @@ public class OutputHandler<OUT> {
StreamPartitioner<OUT> outputPartitioner = null;
try {
- outputPartitioner = configuration.getPartitioner(outputNumber);
+ outputPartitioner = configuration.getPartitioner(streamVertex.userClassLoader,
+ outputNumber);
} catch (Exception e) {
throw new StreamVertexException("Cannot deserialize partitioner for "
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index a8ec98f..2db0d8b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -44,6 +44,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
private StreamingRuntimeContext context;
private Map<String, OperatorState<?>> states;
+ protected ClassLoader userClassLoader;
+
public StreamVertex() {
userInvokable = null;
numTasks = newVertex();
@@ -63,12 +65,13 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
}
protected void initialize() {
+ this.userClassLoader = getUserCodeClassLoader();
this.configuration = new StreamConfig(getTaskConfiguration());
this.name = configuration.getVertexName();
this.isMutable = configuration.getMutability();
this.functionName = configuration.getFunctionName();
- this.function = configuration.getFunction();
- this.states = configuration.getOperatorStates();
+ this.function = configuration.getFunction(userClassLoader);
+ this.states = configuration.getOperatorStates(userClassLoader);
this.context = createRuntimeContext(name, this.states);
}
@@ -85,7 +88,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
}
protected void setInvokable() {
- userInvokable = configuration.getUserInvokable();
+ userInvokable = configuration.getUserInvokable(userClassLoader);
userInvokable.initialize(outputHandler.getCollector(), inputHandler.getInputIter(),
inputHandler.getInputSerializer(), isMutable);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 278cb5a..ebe383d 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -19,10 +19,8 @@ package org.apache.flink.streaming.util;
import java.net.InetSocketAddress;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.minicluster.NepheleMiniCluster;
import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -72,44 +70,12 @@ public class ClusterUtil {
} catch (Exception e) {
throw e;
} finally {
- try {
- exec.stop();
- } catch (Throwable t) {
- }
+ exec.stop();
}
}
- public static void runOnLocalCluster(JobGraph jobGraph, int degreeOfPrallelism)
- throws Exception {
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Running on mini cluster");
- }
-
- try {
-
- Client client = ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
- .getClient();
-
- client.run(jobGraph, true);
- } catch (ProgramInvocationException e) {
- if (e.getMessage().contains("GraphConversionException")) {
- throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
- } else {
- throw e;
- }
- } catch (Exception e) {
- throw e;
- } finally {
- try {
- } catch (Throwable t) {
- }
- }
- }
-
- public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers)
- throws Exception {
- runOnMiniCluster(jobGraph, numberOfTaskTrackers, -1);
+ public static void runOnMiniCluster(JobGraph jobGraph, int numOfSlots) throws Exception {
+ runOnMiniCluster(jobGraph, numOfSlots, -1);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index e01809d..765de9c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -112,11 +112,6 @@ public class StreamVertexTest {
}
try {
- env.setExecutionParallelism(-10);
- fail();
- } catch (IllegalArgumentException e) {
- }
- try {
env.generateSequence(1, 10).project(2);
fail();
} catch (RuntimeException e) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 82d81f4..e2094fb 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -73,13 +73,13 @@ public class IterateExample {
input.add(new Tuple2<Double, Integer>(0., 0));
}
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2)
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.setBufferTimeout(1);
IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).iterate()
.setMaxWaitTime(3000);
- SplitDataStream<Tuple2<Double,Integer>> step = it.map(new Step()).shuffle().setParallelism(2).split(new MySelector());
+ SplitDataStream<Tuple2<Double,Integer>> step = it.map(new Step()).shuffle().split(new MySelector());
it.closeWith(step.select("iterate"));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 4f91514..89b301a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -83,4 +83,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
public Client getClient() {
return this.client;
}
+
+ public List<File> getJars(){
+ return jarFilesToAttach;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index b8151da..b86487f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -102,7 +102,7 @@ public class JobWithJars {
// TODO: Check if proper JAR file
}
- static ClassLoader buildUserCodeClassLoader(List<File> jars, ClassLoader parent) {
+ public static ClassLoader buildUserCodeClassLoader(List<File> jars, ClassLoader parent) {
URL[] urls = new URL[jars.size()];
try {
[5/5] incubator-flink git commit: [FLINK-1204] [streaming] Individual,
self-contained packaging for streaming examples
Posted by mb...@apache.org.
[FLINK-1204] [streaming] Individual, self-contained packaging for streaming examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c6dd9b10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c6dd9b10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c6dd9b10
Branch: refs/heads/master
Commit: c6dd9b104dad2fdf35d3bbca27779060792dc877
Parents: 537d6f6
Author: Marton Balassi <mb...@apache.org>
Authored: Mon Nov 10 14:56:45 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Nov 13 15:24:04 2014 +0100
----------------------------------------------------------------------
.../flink-streaming-examples/pom.xml | 210 ++++++++++++++++++-
1 file changed, 209 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c6dd9b10/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index 43a7430..f1480de 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -56,14 +56,222 @@ under the License.
<build>
<plugins>
+
+ <!-- get default data from flink-java-examples package -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.9</version>
+ <executions>
+ <execution>
+ <id>unpack</id>
+ <phase>package</phase>
+ <goals>
+ <goal>unpack</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java-examples</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ <overWrite>false</overWrite>
+ <outputDirectory>${project.build.directory}/classes</outputDirectory>
+ <includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- self-contained jars for each example -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
+
<executions>
+ <!-- Basic -->
+ <execution>
+ <id>Basic</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>Basic</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.basictopology.BasicTopology</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/basictopology/BasicTopology.class</include>
+ <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- CellInfo -->
<execution>
+ <id>CellInfo</id>
+ <phase>package</phase>
<goals>
- <goal>test-jar</goal>
+ <goal>jar</goal>
</goals>
+ <configuration>
+ <classifier>CellInfo</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.cellinfo.CellInfoLocal</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/cellinfo/*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- Iteration -->
+ <execution>
+ <id>Iteration</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>Iteration</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.iteration.IterateExample</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/iteration/*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- IncrementalLearning -->
+ <execution>
+ <id>IncrementalLearning</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>IncrementalLearning</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/ml/*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- Twitter -->
+ <execution>
+ <id>Twitter</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>Twitter</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.twitter.TwitterStream</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/twitter/*.class</include>
+ <include>org/apache/flink/streaming/examples/twitter/util/*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- WindowJoin -->
+ <execution>
+ <id>WindowJoin</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>WindowJoin</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.window.join.WindowJoinLocal</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/window/join/*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- WordCountPOJO -->
+ <execution>
+ <id>WordCountPOJO</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>WordCountPOJO</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.wordcount.PojoWordCount</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/wordcount/PojoWordCount.class</include>
+ <include>org/apache/flink/streaming/examples/wordcount/PojoWordCount$*.class</include>
+ <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- WordCount -->
+ <execution>
+ <id>WordCount</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>WordCount</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.wordcount.WordCount</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
+ <include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
+ <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+ </includes>
+ </configuration>
</execution>
</executions>
</plugin>
[4/5] incubator-flink git commit: [streaming] Examples refactor and
packaging update
Posted by mb...@apache.org.
[streaming] Examples refactor and packaging update
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d332d6c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d332d6c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d332d6c3
Branch: refs/heads/master
Commit: d332d6c310a9be12b61065083218d7e4fdec1ab1
Parents: e7ce956
Author: mbalassi <mb...@apache.org>
Authored: Wed Nov 12 23:23:15 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Nov 13 15:24:04 2014 +0100
----------------------------------------------------------------------
.../flink-streaming-examples/pom.xml | 54 +-----
.../examples/basictopology/BasicTopology.java | 61 ------
.../examples/iteration/IterateExample.java | 106 ++++++++---
.../ml/IncrementalLearningSkeleton.java | 132 +++++++++----
.../examples/twitter/TwitterStream.java | 98 +++++-----
.../examples/window/join/GradeSource.java | 45 -----
.../examples/window/join/SalarySource.java | 45 -----
.../examples/window/join/WindowJoin.java | 162 ++++++++++++++++
.../examples/window/join/WindowJoinLocal.java | 49 -----
.../examples/wordcount/PojoExample.java | 185 +++++++++++++++++++
.../examples/wordcount/PojoWordCount.java | 169 -----------------
.../streaming/examples/wordcount/WordCount.java | 13 +-
12 files changed, 586 insertions(+), 533 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index f1480de..20e1500 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -65,7 +65,7 @@ under the License.
<executions>
<execution>
<id>unpack</id>
- <phase>package</phase>
+ <phase>prepare-package</phase>
<goals>
<goal>unpack</goal>
</goals>
@@ -92,50 +92,6 @@ under the License.
<artifactId>maven-jar-plugin</artifactId>
<executions>
- <!-- Basic -->
- <execution>
- <id>Basic</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <classifier>Basic</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.streaming.examples.basictopology.BasicTopology</program-class>
- </manifestEntries>
- </archive>
-
- <includes>
- <include>org/apache/flink/streaming/examples/basictopology/BasicTopology.class</include>
- <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
- </includes>
- </configuration>
- </execution>
-
- <!-- CellInfo -->
- <execution>
- <id>CellInfo</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <classifier>CellInfo</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.streaming.examples.cellinfo.CellInfoLocal</program-class>
- </manifestEntries>
- </archive>
-
- <includes>
- <include>org/apache/flink/streaming/examples/cellinfo/*.class</include>
- </includes>
- </configuration>
- </execution>
<!-- Iteration -->
<execution>
@@ -216,7 +172,7 @@ under the License.
<archive>
<manifestEntries>
- <program-class>org.apache.flink.streaming.examples.window.join.WindowJoinLocal</program-class>
+ <program-class>org.apache.flink.streaming.examples.window.join.WindowJoin</program-class>
</manifestEntries>
</archive>
@@ -238,13 +194,13 @@ under the License.
<archive>
<manifestEntries>
- <program-class>org.apache.flink.streaming.examples.wordcount.PojoWordCount</program-class>
+ <program-class>org.apache.flink.streaming.examples.wordcount.PojoExample</program-class>
</manifestEntries>
</archive>
<includes>
- <include>org/apache/flink/streaming/examples/wordcount/PojoWordCount.class</include>
- <include>org/apache/flink/streaming/examples/wordcount/PojoWordCount$*.class</include>
+ <include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include>
+ <include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include>
<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
</includes>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
deleted file mode 100755
index 9379ff2..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.basictopology;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- *
- * Very basic Flink streaming topology for local testing.
- *
- */
-public class BasicTopology {
-
- private static final int PARALLELISM = 1;
-
- public static void main(String[] args) throws Exception {
-
- // We create a new Local environment. The program will be executed using
- // a new mini-cluster that is set up at execution.
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(PARALLELISM);
-
- // We create a new data stream from a collection of words then apply a
- // simple map.
- DataStream<String> stream = env.fromElements(WordCountData.WORDS).map(new IdentityMap());
-
- // Print the results
- stream.print();
-
- env.execute();
- }
-
- public static class IdentityMap implements MapFunction<String, String> {
- private static final long serialVersionUID = 1L;
-
- // map to the same value
- @Override
- public String map(String value) throws Exception {
- return value;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index a011837..f574718 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -23,70 +23,97 @@ import java.util.List;
import java.util.Random;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
- * Example illustrating iterations in Flink streaming programs. The program will
- * sum up random numbers and counts how many additions it needs to reach a
- * specific threshold in an iterative streaming fashion.
- *
+ * Example illustrating iterations in Flink streaming.
+ *
+ * <p>
+ * The program sums up random numbers and counts additions it performs to reach
+ * a specific threshold in an iterative streaming fashion.
+ * </p>
+ *
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>streaming iterations,
+ * <li>buffer timeout to enhance latency,
+ * <li>directed outputs.
+ * </ul>
*/
public class IterateExample {
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
public static void main(String[] args) throws Exception {
- // Set up our input for the stream of (0,0)s
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up input for the stream of (0,0) pairs
List<Tuple2<Double, Integer>> input = new ArrayList<Tuple2<Double, Integer>>();
for (int i = 0; i < 1000; i++) {
input.add(new Tuple2<Double, Integer>(0., 0));
}
- // Obtain execution environment and use setBufferTimeout(0) to enable
- // continuous flushing of the output buffers (lowest latency).
+ // obtain execution environment and set setBufferTimeout(0) to enable
+ // continuous flushing of the output buffers (lowest latency)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.setBufferTimeout(0);
- // Create an iterative datastream from the input.
+ // create an iterative data stream from the input
IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).shuffle()
.iterate();
- // We make sure that the iteration terminates if no new data received in
- // the stream for 5 seconds
+ // trigger iteration termination if no new data received for 5 seconds
it.setMaxWaitTime(5000);
- // We apply the stepfunction to add a new random value to the tuple and
- // increment the counter, then we split the output with our
- // outputselector.
+ // apply the step function to add new random value to the tuple and to
+ // increment the counter and split the output with the output selector
SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle()
.split(new MySelector());
- // We close the iteration be selecting the tuples that was directed to
- // 'iterate' in the outputselector
+ // close the iteration by selecting the tuples that were directed to the
+ // 'iterate' channel in the output selector
it.closeWith(step.select("iterate"));
- // To produce the final output we select he tuples directed to 'output'
- // than project it to the second field
- step.select("output").project(1).types(Integer.class).print();
+ // to produce the final output select the tuples directed to the
+ // 'output' channel then project it to the desired second field
+
+ DataStream<Tuple1<Integer>> numbers = step.select("output").project(1).types(Integer.class);
- // Execute the streaming program
- env.execute();
+ // emit result
+ if (fileOutput) {
+ numbers.writeAsText(outputPath, 1);
+ } else {
+ numbers.print();
+ }
+
+ // execute the program
+ env.execute("Streaming Iteration Example");
}
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
/**
* Iteration step function which takes an input (Double , Integer) and
- * produces an output (Double+random, Integer +1)
- *
+ * produces an output (Double + random, Integer + 1).
*/
public static class Step implements
MapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> {
-
private static final long serialVersionUID = 1L;
-
- Random rnd;
+ private Random rnd;
public Step() {
rnd = new Random();
@@ -101,10 +128,9 @@ public class IterateExample {
}
/**
- * s OutputSelector test which tuple needed to be iterated again.
+ * OutputSelector testing which tuple needs to be iterated again.
*/
public static class MySelector extends OutputSelector<Tuple2<Double, Integer>> {
-
private static final long serialVersionUID = 1L;
@Override
@@ -117,4 +143,30 @@ public class IterateExample {
}
}
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String outputPath;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 1) {
+ outputPath = args[0];
+ } else {
+ System.err.println("Usage: IterateExample <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing IterateExample with generated data.");
+ System.out.println(" Provide parameter to write to file.");
+ System.out.println(" Usage: IterateExample <result path>");
+ }
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index d2f9e6e..2def142 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -24,11 +24,69 @@ import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
+/**
+ * Skeleton for incremental machine learning algorithm consisting of a
+ * pre-computed model, which gets updated for the new inputs and new input data
+ * for which the job provides predictions.
+ *
+ * <p>
+ * This may serve as a base of a number of algorithms, e.g. updating an
+ * incremental Alternating Least Squares model while also providing the
+ * predictions.
+ * </p>
+ *
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Connected streams
+ * <li>CoFunctions
+ * <li>Tuple data types
+ * </ul>
+ */
public class IncrementalLearningSkeleton {
- // Source for feeding new data for prediction
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // build new model on every second of new data
+ DataStream<Double[]> model = env.addSource(new TrainingDataSource()).window(5000)
+ .reduceGroup(new PartialModelBuilder());
+
+ // use partial model for prediction
+ DataStream<Integer> prediction = env.addSource(new NewDataSource()).connect(model)
+ .map(new Predictor());
+
+ // emit result
+ if (fileOutput) {
+ prediction.writeAsText(outputPath, 1);
+ } else {
+ prediction.print();
+ }
+
+ // execute program
+ env.execute("Streaming Incremental Learning");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Feeds new data for prediction. By default it is implemented as constantly
+ * emitting the Integer 1 in a loop.
+ */
public static class NewDataSource implements SourceFunction<Integer> {
private static final long serialVersionUID = 1L;
+ private static final int NEW_DATA_SLEEP_TIME = 1000;
@Override
public void invoke(Collector<Integer> collector) throws Exception {
@@ -37,39 +95,41 @@ public class IncrementalLearningSkeleton {
}
}
- // Method for pulling new data for prediction
private Integer getNewData() throws InterruptedException {
- Thread.sleep(100);
+ Thread.sleep(NEW_DATA_SLEEP_TIME);
return 1;
}
}
- // Source for feeding new training data for partial model building
+ /**
+ * Feeds new training data for the partial model builder. By default it is
+ * implemented as constantly emitting the Integer 1 in a loop.
+ */
public static class TrainingDataSource implements SourceFunction<Integer> {
private static final long serialVersionUID = 1L;
+ private static final int TRAINING_DATA_SLEEP_TIME = 10;
@Override
public void invoke(Collector<Integer> collector) throws Exception {
-
while (true) {
collector.collect(getTrainingData());
}
}
- // Method for pulling new training data
private Integer getTrainingData() throws InterruptedException {
- Thread.sleep(100);
+ Thread.sleep(TRAINING_DATA_SLEEP_TIME);
return 1;
}
}
- // Task for building up-to-date partial models on new training data
+ /**
+ * Builds up-to-date partial models on new training data.
+ */
public static class PartialModelBuilder implements GroupReduceFunction<Integer, Double[]> {
private static final long serialVersionUID = 1L;
- // Method for building partial model on the grouped training data
protected Double[] buildPartialModel(Iterable<Integer> values) {
return new Double[] { 1. };
}
@@ -80,8 +140,15 @@ public class IncrementalLearningSkeleton {
}
}
- // Task for performing prediction using the model produced in
- // batch-processing and the up-to-date partial model
+ /**
+ * Creates prediction using the model produced in batch-processing and the
+ * up-to-date partial model.
+ *
+ * <p>
+ * By defaults emits the Integer 0 for every prediction and the Integer 1
+ * for every model update.
+ * </p>
+ */
public static class Predictor implements CoMapFunction<Integer, Double[], Integer> {
private static final long serialVersionUID = 1L;
@@ -102,38 +169,41 @@ public class IncrementalLearningSkeleton {
return 1;
}
- // Pulls model built with batch-job on the old training data
+ // pulls model built with batch-job on the old training data
protected Double[] getBatchModel() {
return new Double[] { 0. };
}
- // Performs prediction using the two models
+ // performs prediction using the two models
protected Integer predict(Integer inTuple) {
return 0;
}
}
- private static int SOURCE_PARALLELISM = 1;
-
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- IncrementalLearningSkeleton.SOURCE_PARALLELISM = env.getDegreeOfParallelism();
-
- // Build new model on every second of new data
- DataStream<Double[]> model = env.addSource(new TrainingDataSource(), SOURCE_PARALLELISM)
- .window(5000).reduceGroup(new PartialModelBuilder());
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
- // Use partial model for prediction
- DataStream<Integer> prediction = env.addSource(new NewDataSource(), SOURCE_PARALLELISM)
- .connect(model).map(new Predictor());
+ private static boolean fileOutput = false;
+ private static String outputPath;
- // We pring the output
- prediction.print();
+ private static boolean parseParameters(String[] args) {
- env.execute();
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 1) {
+ outputPath = args[0];
+ } else {
+ System.err.println("Usage: IncrementalLearningSkeleton <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing IncrementalLearningSkeleton with generated data.");
+ System.out.println(" Provide parameter to write to file.");
+ System.out.println(" Usage: IncrementalLearningSkeleton <result path>");
+ }
+ return true;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
index ef1d84b..08aa5cb 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -25,14 +25,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
-import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
import org.apache.flink.util.Collector;
import org.apache.sling.commons.json.JSONException;
/**
- * Implements the "TwitterStream" program that computes a most used word occurrence
- * over JSON files in a streaming fashion.
+ * Implements the "TwitterStream" program that computes a most used word
+ * occurrence over JSON files in a streaming fashion.
*
* <p>
* The input is a JSON text file with lines separated by newline characters.
@@ -45,91 +44,89 @@ import org.apache.sling.commons.json.JSONException;
* <p>
* This example shows how to:
* <ul>
- * <li>write a simple Flink Streaming program.
- * <li>use Tuple data types.
- * <li>write and use user-defined functions.
+ * <li>acquire external data,
+ * <li>use in-line defined functions,
+ * <li>handle flattened stream inputs.
* </ul>
*
*/
public class TwitterStream {
-
+
// *************************************************************************
// PROGRAM
// *************************************************************************
-
+
public static void main(String[] args) throws Exception {
if (!parseParameters(args)) {
return;
}
// set up the execution environment
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .getExecutionEnvironment();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setBufferTimeout(1000);
-
+
// get input data
DataStream<String> streamSource = getTextDataStream(env);
- DataStream<Tuple2<String, Integer>> dataStream = streamSource
- // selecting english tweets and split to words
- .flatMap(new SelectEnglishAndTokenizeFlatMap())
- .partitionBy(0)
+ DataStream<Tuple2<String, Integer>> tweets = streamSource
+ // selecting English tweets and splitting to words
+ .flatMap(new SelectEnglishAndTokenizeFlatMap()).partitionBy(0)
// returning (word, 1)
.map(new MapFunction<String, Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
- public Tuple2<String, Integer> map(String value)
- throws Exception {
+ public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<String, Integer>(value, 1);
}
})
// group by words and sum their occurence
- .groupBy(0)
- .sum(1)
- // select maximum occurenced word
+ .groupBy(0).sum(1)
+ // select word with maximum occurence
.flatMap(new SelectMaxOccurence());
// emit result
- dataStream.print();
+ if (fileOutput) {
+ tweets.writeAsText(outputPath, 1);
+ } else {
+ tweets.print();
+ }
// execute program
- env.execute();
+ env.execute("Twitter Streaming Example");
}
-
+
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/**
- * Make sentence from english tweets.
+ * Makes sentences from English tweets.
*
- * Implements the string tokenizer that splits sentences into words as a
+ * <p>
+ * Implements a string tokenizer that splits sentences into words as a
* user-defined FlatMapFunction. The function takes a line (String) and
* splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
* Integer>).
+ * </p>
*/
- public static class SelectEnglishAndTokenizeFlatMap extends
- JSONParseFlatMap<String, String> {
+ public static class SelectEnglishAndTokenizeFlatMap extends JSONParseFlatMap<String, String> {
private static final long serialVersionUID = 1L;
/**
* Select the language from the incoming JSON text
*/
@Override
- public void flatMap(String value, Collector<String> out)
- throws Exception {
+ public void flatMap(String value, Collector<String> out) throws Exception {
try {
if (getString(value, "lang").equals("en")) {
// message of tweet
- StringTokenizer tokenizer = new StringTokenizer(getString(
- value, "text"));
+ StringTokenizer tokenizer = new StringTokenizer(getString(value, "text"));
// split the message
while (tokenizer.hasMoreTokens()) {
- String result = tokenizer.nextToken().replaceAll(
- "\\s*", "");
+ String result = tokenizer.nextToken().replaceAll("\\s*", "");
if (result != null && !result.equals("")) {
out.collect(result);
@@ -143,10 +140,9 @@ public class TwitterStream {
}
/**
- *
- * Implements a user-defined FlatMapFunction that check if the word's current occurence
- * is higher than the maximum occurence. If it is, return with the word and change the maximum.
- *
+ * Implements a user-defined FlatMapFunction that checks if the current
+ * occurence is higher than the maximum occurence. If so, returns the word
+ * and changes the maximum.
*/
public static class SelectMaxOccurence implements
FlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
@@ -158,29 +154,32 @@ public class TwitterStream {
}
@Override
- public void flatMap(Tuple2<String, Integer> value,
- Collector<Tuple2<String, Integer>> out) throws Exception {
+ public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out)
+ throws Exception {
if ((Integer) value.getField(1) >= maximum) {
out.collect(value);
maximum = (Integer) value.getField(1);
}
}
}
-
+
// *************************************************************************
// UTIL METHODS
// *************************************************************************
- private static boolean fromFile = false;
- private static String path;
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
private static boolean parseParameters(String[] args) {
if (args.length > 0) {
- if (args.length == 1) {
- fromFile = true;
- path = args[0];
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
} else {
- System.err.println("USAGE:\nTwitterStream <pathToPropertiesFile>");
+ System.err.println("USAGE:\nTwitterStream <pathToPropertiesFile> <result path>");
return false;
}
} else {
@@ -191,11 +190,10 @@ public class TwitterStream {
return true;
}
- private static DataStream<String> getTextDataStream(
- StreamExecutionEnvironment env) {
- if (fromFile) {
+ private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+ if (fileOutput) {
// read the text file from given input path
- return env.addSource(new TwitterSource(path));
+ return env.readTextFile(textPath);
} else {
// get default test text data
return env.fromElements(TwitterStreamData.TEXTS);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
deleted file mode 100644
index d877f91..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
-
- private static final long serialVersionUID = -5897483980082089771L;
-
- private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
- "andrew", "jean", "richard", "smith", "gorge", "black", "peter" ,"mark", "eric"};
- private Random rand = new Random();
- private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
-
- @Override
- public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
- // Continuously emit tuples with random names and integers (grades).
- while (true) {
- outTuple.f0 = names[rand.nextInt(names.length)];
- outTuple.f1 = rand.nextInt(5) + 1;
- out.collect(outTuple);
- Thread.sleep(rand.nextInt(10) + 1);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
deleted file mode 100644
index 0da48e5..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class SalarySource implements SourceFunction<Tuple2<String, Integer>> {
-
- private static final long serialVersionUID = 6670933703432267728L;
-
- private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
- "andrew", "jean", "richard", "smith", "gorge", "black", "peter", "mark", "eric" };
- private Random rand = new Random();
- private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
-
- @Override
- public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
- // Continuously emit tuples with random names and integers (salaries).
- while (true) {
- outTuple.f0 = names[rand.nextInt(names.length)];
- outTuple.f1 = rand.nextInt(10000);
- out.collect(outTuple);
- Thread.sleep(rand.nextInt(10) + 1);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
new file mode 100644
index 0000000..6d8656c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.window.join;
+
+import java.util.Random;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Example illustrating join over sliding windows of streams in Flink.
+ *
+ * <p>
+ * his example will join two streams with a sliding window. One which emits
+ * grades and one which emits salaries of people.
+ * </p>
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>do windowed joins,
+ * <li>use tuple data types,
+ * <li>write a simple streaming program.
+ */
+public class WindowJoin {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // obtain execution environment
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // connect to the data sources for grades and salaries
+ DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
+ DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
+
+ // apply a temporal join over the two stream based on the names over one
+ // second windows
+ DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
+ .windowJoin(salaries, 1000, 1000, 0, 0);
+
+ // emit result
+ if (fileOutput) {
+ joinedStream.writeAsText(outputPath, 1);
+ } else {
+ joinedStream.print();
+ }
+
+ // execute program
+ env.execute("Windowed Join Example");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ private final static String[] names = { "tom", "jerry", "alice", "bob", "john", "grace" };
+ private final static int GRADE_COUNT = 5;
+ private final static int SALARY_MAX = 10000;
+ private final static int SLEEP_TIME = 10;
+
+ /**
+ * Continuously emit tuples with random names and integers (grades).
+ */
+ public static class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ private Random rand;
+ private Tuple2<String, Integer> outTuple;
+
+ public GradeSource() {
+ rand = new Random();
+ outTuple = new Tuple2<String, Integer>();
+ }
+
+ @Override
+ public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+ while (true) {
+ outTuple.f0 = names[rand.nextInt(names.length)];
+ outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
+ out.collect(outTuple);
+ Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+ }
+ }
+ }
+
+ /**
+ * Continuously emit tuples with random names and integers (salaries).
+ */
+ public static class SalarySource implements SourceFunction<Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ private Random rand;
+ private Tuple2<String, Integer> outTuple;
+
+ public SalarySource() {
+ rand = new Random();
+ outTuple = new Tuple2<String, Integer>();
+ }
+
+ @Override
+ public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+ while (true) {
+ outTuple.f0 = names[rand.nextInt(names.length)];
+ outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
+ out.collect(outTuple);
+ Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+ }
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String outputPath;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 1) {
+ outputPath = args[0];
+ } else {
+ System.err.println("Usage: WindowJoin <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing WindowJoin with generated data.");
+ System.out.println(" Provide parameter to write to file.");
+ System.out.println(" Usage: WindowJoin <result path>");
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
deleted file mode 100644
index 695cf5d..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-public class WindowJoinLocal {
-
- // This example will join two streams with a sliding window. One which emits
- // people's grades and one which emits people's salaries.
-
- public static void main(String[] args) throws Exception {
-
- // Obtain execution environment
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // Connect to the data sources for grades and salaries
- DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
- DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
-
- // Apply a temporal join over the two stream based on the names in one
- // second windows
- DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
- .windowJoin(salaries, 1000, 1000, 0, 0);
-
- // Print the results
- joinedStream.print();
-
- env.execute();
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
new file mode 100644
index 0000000..f377863
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.examples.wordcount;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example shows an implementation of WordCount without using the Tuple2
+ * type, but a custom class.
+ *
+ * <p>
+ * Usage: <code>WordCount <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>use POJO data types,
+ * <li>write a simple Flink program,
+ * <li>write and use user-defined functions.
+ * </ul>
+ */
+public class PojoExample {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ DataStream<String> text = getTextDataStream(env);
+
+ DataStream<Word> counts =
+ // split up the lines into Word objects
+ text.flatMap(new Tokenizer())
+ // group by the field word and sum up the frequency
+ .groupBy("word").sum("frequency");
+
+ if (fileOutput) {
+ counts.writeAsText(outputPath);
+ } else {
+ counts.print();
+ }
+
+ // execute program
+ env.execute("WordCount Pojo Example");
+ }
+
+ // *************************************************************************
+ // DATA TYPES
+ // *************************************************************************
+
+ /**
+ * This is the POJO (Plain Old Java Object) that is being used for all the
+ * operations. As long as all fields are public or have a getter/setter, the
+ * system can handle them
+ */
+ public static class Word {
+
+ private String word;
+ private Integer frequency;
+
+ public Word() {
+ }
+
+ public Word(String word, int i) {
+ this.word = word;
+ this.frequency = i;
+ }
+
+ public String getWord() {
+ return word;
+ }
+
+ public void setWord(String word) {
+ this.word = word;
+ }
+
+ public Integer getFrequency() {
+ return frequency;
+ }
+
+ public void setFrequency(Integer frequency) {
+ this.frequency = frequency;
+ }
+
+ @Override
+ public String toString() {
+ return "(" + word + ", " + frequency + ")";
+ }
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Implements the string tokenizer that splits sentences into words as a
+ * user-defined FlatMapFunction. The function takes a line (String) and
+ * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
+ * Integer>).
+ */
+ public static final class Tokenizer implements FlatMapFunction<String, Word> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(String value, Collector<Word> out) {
+ // normalize and split the line
+ String[] tokens = value.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Word(token, 1));
+ }
+ }
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: PojoExample <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing PojoExample example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from a file.");
+ System.out.println(" Usage: PojoExample <text path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+ if (fileOutput) {
+ // read the text file from given input path
+ return env.readTextFile(textPath);
+ } else {
+ // get default test text data
+ return env.fromElements(WordCountData.WORDS);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java
deleted file mode 100644
index e95f042..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.examples.wordcount;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.Collector;
-
-/**
- * This example shows an implementation of Wordcount without using the Tuple2
- * type, but a custom class.
- *
- */
-public class PojoWordCount {
-
- /**
- * This is the POJO (Plain Old Java Object) that is being used for all the
- * operations. As long as all fields are public or have a getter/setter, the
- * system can handle them
- */
- public static class Word {
- // fields
- private String word;
- private Integer frequency;
-
- // constructors
- public Word() {
- }
-
- public Word(String word, int i) {
- this.word = word;
- this.frequency = i;
- }
-
- // getters setters
- public String getWord() {
- return word;
- }
-
- public void setWord(String word) {
- this.word = word;
- }
-
- public Integer getFrequency() {
- return frequency;
- }
-
- public void setFrequency(Integer frequency) {
- this.frequency = frequency;
- }
-
- // to String
- @Override
- public String toString() {
- return "Word=" + word + " freq=" + frequency;
- }
- }
-
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- // set up the execution environment
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // get input data
- DataStream<String> text = getTextDataStream(env);
-
- DataStream<Word> counts =
- // split up the lines into Word objects
- text.flatMap(new Tokenizer())
- // group by the field word and sum up the frequency
- .groupBy("word")
- .sum("frequency");
-
- if (fileOutput) {
- counts.writeAsText(outputPath);
- } else {
- counts.print();
- }
-
- // execute program
- env.execute("WordCount-Pojo Example");
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- /**
- * Implements the string tokenizer that splits sentences into words as a
- * user-defined FlatMapFunction. The function takes a line (String) and
- * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
- * Integer>).
- */
- public static final class Tokenizer implements FlatMapFunction<String, Word> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(String value, Collector<Word> out) {
- // normalize and split the line
- String[] tokens = value.toLowerCase().split("\\W+");
-
- // emit the pairs
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Word(token, 1));
- }
- }
- }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String textPath;
- private static String outputPath;
-
- private static boolean parseParameters(String[] args) {
-
- if (args.length > 0) {
- // parse input arguments
- fileOutput = true;
- if (args.length == 2) {
- textPath = args[0];
- outputPath = args[1];
- } else {
- System.err.println("Usage: WordCount <text path> <result path>");
- return false;
- }
- } else {
- System.out.println("Executing WordCount example with built-in default data.");
- System.out.println(" Provide parameters to read input data from a file.");
- System.out.println(" Usage: WordCount <text path> <result path>");
- }
- return true;
- }
-
- private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
- if (fileOutput) {
- // read the text file from given input path
- return env.readTextFile(textPath);
- } else {
- // get default test text data
- return env.fromElements(WordCountData.WORDS);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index e07dfe5..085fe5f 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -41,8 +41,8 @@ import org.apache.flink.util.Collector;
* <p>
* This example shows how to:
* <ul>
- * <li>write a simple Flink Streaming program.
- * <li>use Tuple data types.
+ * <li>write a simple Flink Streaming program,
+ * <li>use tuple data types,
* <li>write and use user-defined functions.
* </ul>
*
@@ -66,11 +66,10 @@ public class WordCount {
DataStream<String> text = getTextDataStream(env);
DataStream<Tuple2<String, Integer>> counts =
- // split up the lines in pairs (2-tuples) containing: (word,1)
- text.flatMap(new Tokenizer())
- // group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0)
- .sum(1);
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ text.flatMap(new Tokenizer())
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0).sum(1);
// emit result
if (fileOutput) {