You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:50 UTC
[13/51] [abbrv] git commit: [streaming] Automerge error + License fix
[streaming] Automerge error + License fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f186f3d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f186f3d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f186f3d5
Branch: refs/heads/master
Commit: f186f3d5144cc52f11562a069ee275df96fbf0e0
Parents: 2c4e195
Author: gyfora <gy...@gmail.com>
Authored: Tue Jul 22 19:46:55 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:19:17 2014 +0200
----------------------------------------------------------------------
.../flink/streaming/api/JobGraphBuilder.java | 26 ++++++++++--------
.../api/StreamExecutionEnvironment.java | 2 +-
.../api/invokable/operator/BatchIterator.java | 19 +++++++++++++
.../api/invokable/operator/CoMapTest.java | 29 ++++++++++++--------
4 files changed, 52 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f186f3d5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index bf9057b..986172b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -574,12 +574,12 @@ public class JobGraphBuilder {
setEdge(upStreamComponentName, downStreamComponentName, new ForwardPartitioner<T>(),
typeNumber);
}
-
+
/**
* Connects two components with the given names by distribute partitioning.
* <p>
- * Distribute partitioning: sends the output tuples evenly distributed
- * along the selected channels
+ * Distribute partitioning: sends the output tuples evenly distributed along
+ * the selected channels
*
* @param inputStream
* The DataStream object of the input
@@ -587,10 +587,13 @@ public class JobGraphBuilder {
* Name of the upstream component, that will emit the tuples
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
+ * @param typeNumber
+ * Number of the type (used at co-functions)
*/
public <T extends Tuple> void distributeConnect(DataStream<T> inputStream,
- String upStreamComponentName, String downStreamComponentName) {
- setEdge(upStreamComponentName, downStreamComponentName, new DistributePartitioner<T>());
+ String upStreamComponentName, String downStreamComponentName, int typeNumber) {
+ setEdge(upStreamComponentName, downStreamComponentName, new DistributePartitioner<T>(),
+ typeNumber);
}
/**
@@ -773,20 +776,21 @@ public class JobGraphBuilder {
for (String componentName : outEdgeList.keySet()) {
createVertex(componentName);
}
-
+
for (String upStreamComponentName : outEdgeList.keySet()) {
int i = 0;
-
+
ArrayList<Integer> outEdgeTypeList = outEdgeType.get(upStreamComponentName);
for (String downStreamComponentName : outEdgeList.get(upStreamComponentName)) {
Configuration downStreamComponentConfig = components.get(downStreamComponentName)
.getConfiguration();
-
- int inputNumber = downStreamComponentConfig.getInteger("numberOfInputs", 0);
- downStreamComponentConfig.setInteger("inputType_" + inputNumber++, outEdgeTypeList.get(i));
+
+ int inputNumber = downStreamComponentConfig.getInteger("numberOfInputs", 0);
+ downStreamComponentConfig.setInteger("inputType_" + inputNumber++,
+ outEdgeTypeList.get(i));
downStreamComponentConfig.setInteger("numberOfInputs", inputNumber);
-
+
connect(upStreamComponentName, downStreamComponentName,
connectionTypes.get(upStreamComponentName).get(i));
i++;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f186f3d5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
index b69c457..4b5bc98 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
@@ -572,7 +572,7 @@ public abstract class StreamExecutionEnvironment {
jobGraphBuilder.forwardConnect(inputStream, input, outputID, typeNumber);
break;
case DISTRIBUTE:
- jobGraphBuilder.distributeConnect(inputStream, input, outputID);
+ jobGraphBuilder.distributeConnect(inputStream, input, outputID, typeNumber);
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f186f3d5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
index 1432749..7b971b9 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
@@ -1,3 +1,22 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
package org.apache.flink.streaming.api.invokable.operator;
import java.util.Iterator;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f186f3d5/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
index b2fd3cf..ecd81bd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -40,6 +40,16 @@ public class CoMapTest implements Serializable {
private static Set<String> result;
private static Set<String> expected = new HashSet<String>();
+ public CoMapTest() {
+ expected.add("a");
+ expected.add("b");
+ expected.add("c");
+ expected.add("1");
+ expected.add("2");
+ expected.add("3");
+ expected.add("4");
+ }
+
private final static class EmptySink extends SinkFunction<Tuple1<Boolean>> {
private static final long serialVersionUID = 1L;
@@ -68,16 +78,8 @@ public class CoMapTest implements Serializable {
@Test
public void test() {
LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
-
+
result = new HashSet<String>();
-
- expected.add("a");
- expected.add("b");
- expected.add("c");
- expected.add("1");
- expected.add("2");
- expected.add("3");
- expected.add("4");
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@@ -93,6 +95,8 @@ public class CoMapTest implements Serializable {
@Test
public void multipleInputTest() {
+ LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+
result = new HashSet<String>();
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@@ -102,11 +106,12 @@ public class CoMapTest implements Serializable {
DataStream<Tuple1<Integer>> ds2 = env.fromElements(2, 4).connectWith(ds1);
DataStream<Tuple1<String>> ds3 = env.fromElements("a", "b");
-
+
@SuppressWarnings({ "unused", "unchecked" })
- DataStream<Tuple1<Boolean>> ds4 = env.fromElements("c").connectWith(ds3).coMapWith(new MyCoMap(),
+ DataStream<Tuple1<Boolean>> ds4 = env.fromElements("c").connectWith(ds3)
+ .coMapWith(new MyCoMap(),
- ds2).addSink(new EmptySink());
+ ds2).addSink(new EmptySink());
env.executeTest(32);
Assert.assertArrayEquals(expected.toArray(), result.toArray());