You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/11/13 16:06:16 UTC
[3/5] incubator-flink git commit: [streaming] Example cleanup +
windowJoin fix
[streaming] Example cleanup + windowJoin fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e7ce9567
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e7ce9567
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e7ce9567
Branch: refs/heads/master
Commit: e7ce9567c57ffc59e5a7588445b6b891bc58e5f8
Parents: 6867f9b
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Nov 12 12:32:32 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Nov 13 15:24:04 2014 +0100
----------------------------------------------------------------------
.../api/datastream/ConnectedDataStream.java | 6 +
.../examples/basictopology/BasicTopology.java | 98 ++++++++-------
.../examples/cellinfo/CellInfoLocal.java | 123 ------------------
.../examples/cellinfo/IWorkerEngine.java | 23 ----
.../flink/streaming/examples/cellinfo/Util.java | 28 -----
.../examples/cellinfo/WorkerEngineBin.java | 97 --------------
.../examples/cellinfo/WorkerEngineExact.java | 76 -----------
.../examples/iteration/IterateExample.java | 80 ++++++++----
.../ml/IncrementalLearningSkeleton.java | 15 ++-
.../examples/twitter/TwitterStream.java | 5 +-
.../examples/window/join/GradeSource.java | 14 +--
.../examples/window/join/SalarySource.java | 14 +--
.../examples/window/join/WindowJoinLocal.java | 25 ++--
.../examples/window/join/WindowJoinTask.java | 126 -------------------
14 files changed, 148 insertions(+), 582 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 23c420c..6556550 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -580,6 +580,9 @@ public class ConnectedDataStream<IN1, IN2> {
SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2, int fieldIn1, int fieldIn2) {
+ dataStream1 = dataStream1.groupBy(fieldIn1);
+ dataStream2 = dataStream2.groupBy(fieldIn2);
+
JoinWindowFunction<IN1, IN2> joinWindowFunction = new JoinWindowFunction<IN1, IN2>(
dataStream1.getOutputType(), dataStream2.getOutputType(), fieldIn1, fieldIn2);
@@ -590,6 +593,9 @@ public class ConnectedDataStream<IN1, IN2> {
SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2, String fieldIn1, String fieldIn2) {
+ dataStream1 = dataStream1.groupBy(fieldIn1);
+ dataStream2 = dataStream2.groupBy(fieldIn2);
+
JoinWindowFunction<IN1, IN2> joinWindowFunction = new JoinWindowFunction<IN1, IN2>(
dataStream1.getOutputType(), dataStream2.getOutputType(), fieldIn1, fieldIn2);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
index 2b1ac4f..9379ff2 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
@@ -1,49 +1,61 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.basictopology;
-
+ */
+
+package org.apache.flink.streaming.examples.basictopology;
+
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-public class BasicTopology {
-
- public static class IdentityMap implements MapFunction<String, String> {
- private static final long serialVersionUID = 1L;
- // map to the same value
- @Override
- public String map(String value) throws Exception {
- return value;
- }
-
- }
-
- private static final int PARALLELISM = 1;
-
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(PARALLELISM);
-
- DataStream<String> stream = env.fromElements(WordCountData.WORDS).map(new IdentityMap());
-
- stream.print();
-
- env.execute();
- }
-}
+
+/**
+ *
+ * Very basic Flink streaming topology for local testing.
+ *
+ */
+public class BasicTopology {
+
+ private static final int PARALLELISM = 1;
+
+ public static void main(String[] args) throws Exception {
+
+ // We create a new Local environment. The program will be executed using
+ // a new mini-cluster that is set up at execution.
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironment(PARALLELISM);
+
+ // We create a new data stream from a collection of words then apply a
+ // simple map.
+ DataStream<String> stream = env.fromElements(WordCountData.WORDS).map(new IdentityMap());
+
+ // Print the results
+ stream.print();
+
+ env.execute();
+ }
+
+ public static class IdentityMap implements MapFunction<String, String> {
+ private static final long serialVersionUID = 1L;
+
+ // map to the same value
+ @Override
+ public String map(String value) throws Exception {
+ return value;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
deleted file mode 100644
index 1dcb5e6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class CellInfoLocal {
-
- private final static int CELL_COUNT = 10;
- private final static int LAST_MILLIS = 1000;
- private final static int PARALLELISM = 1;
- private final static int SOURCE_PARALLELISM = 1;
- private final static int QUERY_SLEEP_TIME = 1000;
- private final static int QUERY_COUNT = 10;
- private final static int INFO_SLEEP_TIME = 100;
- private final static int INFO_COUNT = 100;
-
- public final static class InfoSource implements SourceFunction<Tuple3<Integer, Long, Integer>> {
- private static final long serialVersionUID = 1L;
-
- private static Random rand = new Random();
- private Tuple3<Integer, Long, Integer> tuple = new Tuple3<Integer, Long, Integer>(0, 0L, 0);
-
- @Override
- public void invoke(Collector<Tuple3<Integer, Long, Integer>> out) throws Exception {
- for (int i = 0; i < INFO_COUNT; i++) {
- Thread.sleep(INFO_SLEEP_TIME);
- tuple.f0 = rand.nextInt(CELL_COUNT);
- tuple.f1 = System.currentTimeMillis();
-
- out.collect(tuple);
- }
- }
- }
-
- private final static class QuerySource implements
- SourceFunction<Tuple3<Integer, Long, Integer>> {
- private static final long serialVersionUID = 1L;
-
- private static Random rand = new Random();
- Tuple3<Integer, Long, Integer> tuple = new Tuple3<Integer, Long, Integer>(0, 0L,
- LAST_MILLIS);
-
- @Override
- public void invoke(Collector<Tuple3<Integer, Long, Integer>> collector) throws Exception {
- for (int i = 0; i < QUERY_COUNT; i++) {
- Thread.sleep(QUERY_SLEEP_TIME);
- tuple.f0 = rand.nextInt(CELL_COUNT);
- tuple.f1 = System.currentTimeMillis();
- collector.collect(tuple);
- }
- }
- }
-
- private final static class CellTask extends
- RichCoMapFunction<Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>, String> {
- private static final long serialVersionUID = 1L;
-
- private WorkerEngineExact engine;
- private Integer cellID;
- private Long timeStamp;
- private Integer lastMillis;
-
- public CellTask() {
- engine = new WorkerEngineExact(CELL_COUNT, LAST_MILLIS, System.currentTimeMillis());
- }
-
- // INFO
- @Override
- public String map1(Tuple3<Integer, Long, Integer> value) {
- cellID = value.f0;
- timeStamp = value.f1;
- engine.put(cellID, timeStamp);
- return "INFO:\t" + cellID + " @ " + timeStamp;
- }
-
- // QUERY
- @Override
- public String map2(Tuple3<Integer, Long, Integer> value) {
- cellID = value.f0;
- timeStamp = value.f1;
- lastMillis = value.f2;
- return "QUERY:\t" + cellID + ": " + engine.get(timeStamp, lastMillis, cellID);
- }
- }
-
- // Example for connecting data streams
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
- PARALLELISM).setBufferTimeout(100);
-
- DataStream<Tuple3<Integer, Long, Integer>> querySource = env.addSource(new QuerySource(),
- SOURCE_PARALLELISM).partitionBy(0);
-
- DataStream<String> stream = env.addSource(new InfoSource(), SOURCE_PARALLELISM)
- .partitionBy(0).connect(querySource).map(new CellTask());
- stream.print();
-
- env.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/IWorkerEngine.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/IWorkerEngine.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/IWorkerEngine.java
deleted file mode 100644
index b2b2264..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/IWorkerEngine.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-public interface IWorkerEngine {
- public int get(long timeStamp, long lastMillis, int cellId);
- public void put(int cellId, long timeStamp);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/Util.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/Util.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/Util.java
deleted file mode 100644
index 7463c6c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/Util.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-public class Util {
- public static int mod(int x, int y) {
- int result = x % y;
- if (result < 0) {
- result += y;
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineBin.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineBin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineBin.java
deleted file mode 100644
index b6097f5..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineBin.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-import org.apache.flink.streaming.examples.cellinfo.Util;
-
-public class WorkerEngineBin implements java.io.Serializable, IWorkerEngine {
-
- private static final long serialVersionUID = 1L;
- private long unitLength_;
- private long lastTimeUpdated_;
- private int[][] counters_;
- private int pointer_;
-
- public WorkerEngineBin(long unitLength, int numOfCells, int bufferInterval, long currentTime) {
- lastTimeUpdated_ = currentTime / unitLength * unitLength;
- unitLength_ = unitLength;
- counters_ = new int[(int) (bufferInterval / unitLength) + 1][numOfCells];
- }
-
- private int getCell(int interval, int cell) {
- return counters_[Util.mod((interval + pointer_), counters_.length)][cell];
- }
-
- private void incrCell(int interval, int cell) {
- ++counters_[Util.mod((interval + pointer_), counters_.length)][cell];
- }
-
- private void toZero(int interval) {
- for (int cell = 0; cell < counters_[0].length; ++cell) {
- counters_[Util.mod((interval + pointer_), counters_.length)][cell] = 0;
- }
- }
-
- private void shift(int shiftBy) {
- if (shiftBy > 0 && shiftBy < counters_.length) {
- pointer_ = Util.mod((pointer_ - shiftBy), counters_.length);
- for (int i = 0; i < shiftBy; ++i) {
- toZero(i);
- }
- } else if (shiftBy >= counters_.length) {
- pointer_ = 0;
- for (int i = 0; i < counters_.length; ++i) {
- toZero(i);
- }
- }
- }
-
- public int get(long timeStamp, long lastMillis, int cellId) {
- int shift = refresh(timeStamp);
- int numOfLastIntervals = (int) (lastMillis / unitLength_);
- if (shift >= counters_.length || numOfLastIntervals >= counters_.length) {
- return -1;
- }
- int sum = 0;
- for (int i = shift + 1; i < shift + numOfLastIntervals + 1; ++i) {
- sum += getCell(i, cellId);
- }
- return sum;
- }
-
- private int refresh(long timeStamp) {
- int shiftBy = (int) ((timeStamp - lastTimeUpdated_) / unitLength_);
- shift(shiftBy);
- int retVal;
- if (shiftBy > 0) {
- lastTimeUpdated_ = timeStamp / unitLength_ * unitLength_;
- retVal = 0;
- } else {
- retVal = -shiftBy;
- }
- return retVal;
- }
-
- public void put(int cellId, long timeStamp) {
- int shift = refresh(timeStamp);
- if (shift >= counters_.length) {
- return;
- }
- incrCell(shift, cellId);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineExact.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineExact.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineExact.java
deleted file mode 100644
index 8a9bcda..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineExact.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class WorkerEngineExact implements java.io.Serializable, IWorkerEngine {
- private static final long serialVersionUID = 1L;
- private long lastTimeUpdated_;
- private long bufferInterval_;
- private TreeMap<Long, Integer>[] counters_;
-
- @SuppressWarnings("unchecked")
- public WorkerEngineExact(int numOfCells, int bufferInterval, long currentTime) {
- lastTimeUpdated_ = currentTime;
- bufferInterval_ = bufferInterval;
- counters_ = new TreeMap[numOfCells];
- for (int i = 0; i < numOfCells; ++i) {
- counters_[i] = new TreeMap<Long, Integer>();
- }
- }
-
- public int get(long timeStamp, long lastMillis, int cellId) {
- refresh(timeStamp);
- Map<Long, Integer> subMap = counters_[cellId].subMap(timeStamp - lastMillis, true, timeStamp, false);
- int retVal = 0;
- for (Map.Entry<Long, Integer> entry : subMap.entrySet()) {
- retVal += entry.getValue();
- }
- return retVal;
- }
-
- public void put(int cellId, long timeStamp) {
- refresh(timeStamp);
- TreeMap<Long, Integer> map = counters_[cellId];
- if (map.containsKey(timeStamp)) {
- map.put(timeStamp, map.get(timeStamp) + 1);
- } else {
- map.put(timeStamp, 1);
- }
- }
-
- public void refresh(long timeStamp) {
- if (timeStamp - lastTimeUpdated_ > bufferInterval_) {
- for (int i = 0; i < counters_.length; ++i) {
- for (Iterator<Map.Entry<Long, Integer>> it = counters_[i].entrySet().iterator(); it.hasNext();) {
- Map.Entry<Long, Integer> entry = it.next();
- long time = entry.getKey();
- if (timeStamp - time > bufferInterval_) {
- it.remove();
- } else {
- break;
- }
- }
- }
- lastTimeUpdated_ = timeStamp;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index e2094fb..a011837 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -29,13 +29,63 @@ import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+/**
+ * Example illustrating iterations in Flink streaming programs. The program will
+ * sum up random numbers and counts how many additions it needs to reach a
+ * specific threshold in an iterative streaming fashion.
+ *
+ */
public class IterateExample {
+ public static void main(String[] args) throws Exception {
+
+ // Set up our input for the stream of (0,0)s
+ List<Tuple2<Double, Integer>> input = new ArrayList<Tuple2<Double, Integer>>();
+ for (int i = 0; i < 1000; i++) {
+ input.add(new Tuple2<Double, Integer>(0., 0));
+ }
+
+ // Obtain execution environment and use setBufferTimeout(0) to enable
+ // continuous flushing of the output buffers (lowest latency).
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
+ .setBufferTimeout(0);
+
+ // Create an iterative datastream from the input.
+ IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).shuffle()
+ .iterate();
+
+ // We make sure that the iteration terminates if no new data received in
+ // the stream for 5 seconds
+ it.setMaxWaitTime(5000);
+
+ // We apply the stepfunction to add a new random value to the tuple and
+ // increment the counter, then we split the output with our
+ // outputselector.
+ SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle()
+ .split(new MySelector());
+
+ // We close the iteration be selecting the tuples that was directed to
+ // 'iterate' in the outputselector
+ it.closeWith(step.select("iterate"));
+
+ // To produce the final output we select he tuples directed to 'output'
+ // than project it to the second field
+ step.select("output").project(1).types(Integer.class).print();
+
+ // Execute the streaming program
+ env.execute();
+ }
+
+ /**
+ * Iteration step function which takes an input (Double , Integer) and
+ * produces an output (Double+random, Integer +1)
+ *
+ */
public static class Step implements
MapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> {
private static final long serialVersionUID = 1L;
-
+
Random rnd;
public Step() {
@@ -50,13 +100,16 @@ public class IterateExample {
}
+ /**
+ * s OutputSelector test which tuple needed to be iterated again.
+ */
public static class MySelector extends OutputSelector<Tuple2<Double, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void select(Tuple2<Double, Integer> value, Collection<String> outputs) {
- if (value.f0 > 100) {
+ if (value.f0 > 200) {
outputs.add("output");
} else {
outputs.add("iterate");
@@ -64,27 +117,4 @@ public class IterateExample {
}
}
-
-
- public static void main(String[] args) throws Exception {
-
- List<Tuple2<Double, Integer>> input = new ArrayList<Tuple2<Double, Integer>>();
- for (int i = 0; i < 100; i++) {
- input.add(new Tuple2<Double, Integer>(0., 0));
- }
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
- .setBufferTimeout(1);
-
- IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).iterate()
- .setMaxWaitTime(3000);
-
- SplitDataStream<Tuple2<Double,Integer>> step = it.map(new Step()).shuffle().split(new MySelector());
-
- it.closeWith(step.select("iterate"));
-
- step.select("output").project(1).types(Integer.class).print();
-
- env.execute();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 9fe1af4..d2f9e6e 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -39,7 +39,7 @@ public class IncrementalLearningSkeleton {
// Method for pulling new data for prediction
private Integer getNewData() throws InterruptedException {
- Thread.sleep(1000);
+ Thread.sleep(100);
return 1;
}
}
@@ -59,7 +59,7 @@ public class IncrementalLearningSkeleton {
// Method for pulling new training data
private Integer getTrainingData() throws InterruptedException {
- Thread.sleep(1000);
+ Thread.sleep(100);
return 1;
}
@@ -114,13 +114,13 @@ public class IncrementalLearningSkeleton {
}
- private static final int PARALLELISM = 1;
- private static final int SOURCE_PARALLELISM = 1;
+ private static int SOURCE_PARALLELISM = 1;
public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
- PARALLELISM).setBufferTimeout(1000);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ IncrementalLearningSkeleton.SOURCE_PARALLELISM = env.getDegreeOfParallelism();
// Build new model on every second of new data
DataStream<Double[]> model = env.addSource(new TrainingDataSource(), SOURCE_PARALLELISM)
@@ -130,11 +130,10 @@ public class IncrementalLearningSkeleton {
DataStream<Integer> prediction = env.addSource(new NewDataSource(), SOURCE_PARALLELISM)
.connect(model).map(new Predictor());
+ // We pring the output
prediction.print();
env.execute();
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
index 5d8022a..ef1d84b 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -32,7 +32,7 @@ import org.apache.sling.commons.json.JSONException;
/**
* Implements the "TwitterStream" program that computes a most used word occurrence
- * histogram over JSON files in a streaming fashion.
+ * over JSON files in a streaming fashion.
*
* <p>
* The input is a JSON text file with lines separated by newline characters.
@@ -52,7 +52,6 @@ import org.apache.sling.commons.json.JSONException;
*
*/
public class TwitterStream {
- private static final int PARALLELISM = 1;
// *************************************************************************
// PROGRAM
@@ -65,7 +64,7 @@ public class TwitterStream {
// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(PARALLELISM);
+ .getExecutionEnvironment();
env.setBufferTimeout(1000);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
index c895c5b..d877f91 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
@@ -19,29 +19,27 @@ package org.apache.flink.streaming.examples.window.join;
import java.util.Random;
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
-public class GradeSource implements SourceFunction<Tuple3<String, Integer, Long>> {
+public class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = -5897483980082089771L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
- "andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
+ "andrew", "jean", "richard", "smith", "gorge", "black", "peter" ,"mark", "eric"};
private Random rand = new Random();
- private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
- private Long progress = 0L;
+ private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
@Override
- public void invoke(Collector<Tuple3<String, Integer, Long>> out) throws Exception {
+ public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
// Continuously emit tuples with random names and integers (grades).
while (true) {
outTuple.f0 = names[rand.nextInt(names.length)];
outTuple.f1 = rand.nextInt(5) + 1;
- outTuple.f2 = progress;
out.collect(outTuple);
- progress += 1;
+ Thread.sleep(rand.nextInt(10) + 1);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
index 641a738..0da48e5 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
@@ -19,29 +19,27 @@ package org.apache.flink.streaming.examples.window.join;
import java.util.Random;
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
-public class SalarySource implements SourceFunction<Tuple3<String, Integer, Long>> {
+public class SalarySource implements SourceFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 6670933703432267728L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
- "andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
+ "andrew", "jean", "richard", "smith", "gorge", "black", "peter", "mark", "eric" };
private Random rand = new Random();
- private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
- private Long progress = 0L;
+ private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
@Override
- public void invoke(Collector<Tuple3<String, Integer, Long>> out) throws Exception {
+ public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
// Continuously emit tuples with random names and integers (salaries).
while (true) {
outTuple.f0 = names[rand.nextInt(names.length)];
outTuple.f1 = rand.nextInt(10000);
- outTuple.f2 = progress;
out.collect(outTuple);
- progress += 1;
+ Thread.sleep(rand.nextInt(10) + 1);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
index 211daf6..695cf5d 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
@@ -17,33 +17,30 @@
package org.apache.flink.streaming.examples.window.join;
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class WindowJoinLocal {
- private static final int PARALLELISM = 4;
- private static final int SOURCE_PARALLELISM = 2;
-
// This example will join two streams with a sliding window. One which emits
// people's grades and one which emits people's salaries.
public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
- PARALLELISM).setBufferTimeout(100);
-
- DataStream<Tuple3<String, Integer, Long>> grades = env.addSource(new GradeSource(),
- SOURCE_PARALLELISM);
+ // Obtain execution environment
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Tuple3<String, Integer, Long>> salaries = env.addSource(new SalarySource(),
- SOURCE_PARALLELISM);
+ // Connect to the data sources for grades and salaries
+ DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
+ DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
- DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades.connect(salaries)
- .flatMap(new WindowJoinTask());
+ // Apply a temporal join over the two stream based on the names in one
+ // second windows
+ DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
+ .windowJoin(salaries, 1000, 1000, 0, 0);
- System.out.println("(NAME, GRADE, SALARY)");
+ // Print the results
joinedStream.print();
env.execute();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
deleted file mode 100644
index 524ffbd..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.function.co.RichCoFlatMapFunction;
-import org.apache.flink.util.Collector;
-
-public class WindowJoinTask extends
- RichCoFlatMapFunction<Tuple3<String, Integer, Long>, Tuple3<String, Integer, Long>, Tuple3<String, Integer, Integer>> {
-
- class SalaryProgress {
- public SalaryProgress(Integer salary, Long progress) {
- this.salary = salary;
- this.progress = progress;
- }
-
- Integer salary;
- Long progress;
- }
-
- class GradeProgress {
- public GradeProgress(Integer grade, Long progress) {
- this.grade = grade;
- this.progress = progress;
- }
-
- Integer grade;
- Long progress;
- }
-
- private static final long serialVersionUID = 749913336259789039L;
- private int windowSize = 100;
- private HashMap<String, LinkedList<GradeProgress>> gradeHashmap;
- private HashMap<String, LinkedList<SalaryProgress>> salaryHashmap;
- private String name;
- private Long progress;
-
- public WindowJoinTask() {
- gradeHashmap = new HashMap<String, LinkedList<GradeProgress>>();
- salaryHashmap = new HashMap<String, LinkedList<SalaryProgress>>();
- name = new String();
- progress = 0L;
- }
-
- Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>();
-
- // Joins the input value (grade) with the already known values (salaries) on
- // a given interval.
- // Also stores the new element.
- @Override
- public void flatMap1(Tuple3<String, Integer, Long> value,
- Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
- name = value.f0;
- progress = value.f2;
-
- outputTuple.f0 = name;
- outputTuple.f1 = value.f1;
-
- if (salaryHashmap.containsKey(name)) {
- Iterator<SalaryProgress> iterator = salaryHashmap.get(name).iterator();
- while (iterator.hasNext()) {
- SalaryProgress entry = iterator.next();
- if (progress - entry.progress > windowSize) {
- iterator.remove();
- } else {
- outputTuple.f2 = entry.salary;
- out.collect(outputTuple);
- }
- }
- }
- if (!gradeHashmap.containsKey(name)) {
- gradeHashmap.put(name, new LinkedList<GradeProgress>());
- }
- gradeHashmap.get(name).add(new GradeProgress(value.f1, progress));
- }
-
- // Joins the input value (salary) with the already known values (grades) on
- // a given interval.
- // Also stores the new element.
- @Override
- public void flatMap2(Tuple3<String, Integer, Long> value,
- Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
- name = value.f0;
- progress = value.f2;
-
- outputTuple.f0 = name;
- outputTuple.f2 = value.f1;
-
- if (gradeHashmap.containsKey(name)) {
- Iterator<GradeProgress> iterator = gradeHashmap.get(name).iterator();
- while (iterator.hasNext()) {
- GradeProgress entry = iterator.next();
- if (progress - entry.progress > windowSize) {
- iterator.remove();
- } else {
- outputTuple.f1 = entry.grade;
- out.collect(outputTuple);
- }
- }
- }
- if (!salaryHashmap.containsKey(name)) {
- salaryHashmap.put(name, new LinkedList<SalaryProgress>());
- }
- salaryHashmap.get(name).add(new SalaryProgress(value.f1, progress));
- }
-}