You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:50 UTC

[13/51] [abbrv] git commit: [streaming] Automerge error + License fix

[streaming] Automerge error + License fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f186f3d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f186f3d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f186f3d5

Branch: refs/heads/master
Commit: f186f3d5144cc52f11562a069ee275df96fbf0e0
Parents: 2c4e195
Author: gyfora <gy...@gmail.com>
Authored: Tue Jul 22 19:46:55 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:19:17 2014 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    | 26 ++++++++++--------
 .../api/StreamExecutionEnvironment.java         |  2 +-
 .../api/invokable/operator/BatchIterator.java   | 19 +++++++++++++
 .../api/invokable/operator/CoMapTest.java       | 29 ++++++++++++--------
 4 files changed, 52 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f186f3d5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index bf9057b..986172b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -574,12 +574,12 @@ public class JobGraphBuilder {
 		setEdge(upStreamComponentName, downStreamComponentName, new ForwardPartitioner<T>(),
 				typeNumber);
 	}
-	
+
 	/**
 	 * Connects two components with the given names by distribute partitioning.
 	 * <p>
-	 * Distribute partitioning: sends the output tuples evenly distributed
-	 * along the selected channels
+	 * Distribute partitioning: sends the output tuples evenly distributed along
+	 * the selected channels
 	 * 
 	 * @param inputStream
 	 *            The DataStream object of the input
@@ -587,10 +587,13 @@ public class JobGraphBuilder {
 	 *            Name of the upstream component, that will emit the tuples
 	 * @param downStreamComponentName
 	 *            Name of the downstream component, that will receive the tuples
+	 * @param typeNumber
+	 *            Number of the type (used at co-functions)
 	 */
 	public <T extends Tuple> void distributeConnect(DataStream<T> inputStream,
-			String upStreamComponentName, String downStreamComponentName) {
-		setEdge(upStreamComponentName, downStreamComponentName, new DistributePartitioner<T>());
+			String upStreamComponentName, String downStreamComponentName, int typeNumber) {
+		setEdge(upStreamComponentName, downStreamComponentName, new DistributePartitioner<T>(),
+				typeNumber);
 	}
 
 	/**
@@ -773,20 +776,21 @@ public class JobGraphBuilder {
 		for (String componentName : outEdgeList.keySet()) {
 			createVertex(componentName);
 		}
-		
+
 		for (String upStreamComponentName : outEdgeList.keySet()) {
 			int i = 0;
-			
+
 			ArrayList<Integer> outEdgeTypeList = outEdgeType.get(upStreamComponentName);
 
 			for (String downStreamComponentName : outEdgeList.get(upStreamComponentName)) {
 				Configuration downStreamComponentConfig = components.get(downStreamComponentName)
 						.getConfiguration();
-				
-				int inputNumber = downStreamComponentConfig.getInteger("numberOfInputs", 0);				
-				downStreamComponentConfig.setInteger("inputType_" + inputNumber++, outEdgeTypeList.get(i));
+
+				int inputNumber = downStreamComponentConfig.getInteger("numberOfInputs", 0);
+				downStreamComponentConfig.setInteger("inputType_" + inputNumber++,
+						outEdgeTypeList.get(i));
 				downStreamComponentConfig.setInteger("numberOfInputs", inputNumber);
-				
+
 				connect(upStreamComponentName, downStreamComponentName,
 						connectionTypes.get(upStreamComponentName).get(i));
 				i++;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f186f3d5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
index b69c457..4b5bc98 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
@@ -572,7 +572,7 @@ public abstract class StreamExecutionEnvironment {
 				jobGraphBuilder.forwardConnect(inputStream, input, outputID, typeNumber);
 				break;
 			case DISTRIBUTE:
-				jobGraphBuilder.distributeConnect(inputStream, input, outputID);
+				jobGraphBuilder.distributeConnect(inputStream, input, outputID, typeNumber);
 				break;
 			}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f186f3d5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
index 1432749..7b971b9 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
@@ -1,3 +1,22 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
 package org.apache.flink.streaming.api.invokable.operator;
 
 import java.util.Iterator;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f186f3d5/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
index b2fd3cf..ecd81bd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -40,6 +40,16 @@ public class CoMapTest implements Serializable {
 	private static Set<String> result;
 	private static Set<String> expected = new HashSet<String>();
 
+	public CoMapTest() {
+		expected.add("a");
+		expected.add("b");
+		expected.add("c");
+		expected.add("1");
+		expected.add("2");
+		expected.add("3");
+		expected.add("4");
+	}
+
 	private final static class EmptySink extends SinkFunction<Tuple1<Boolean>> {
 		private static final long serialVersionUID = 1L;
 
@@ -68,16 +78,8 @@ public class CoMapTest implements Serializable {
 	@Test
 	public void test() {
 		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
-		
+
 		result = new HashSet<String>();
-		
-		expected.add("a");
-		expected.add("b");
-		expected.add("c");
-		expected.add("1");
-		expected.add("2");
-		expected.add("3");
-		expected.add("4");
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
@@ -93,6 +95,8 @@ public class CoMapTest implements Serializable {
 
 	@Test
 	public void multipleInputTest() {
+		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+
 		result = new HashSet<String>();
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@@ -102,11 +106,12 @@ public class CoMapTest implements Serializable {
 		DataStream<Tuple1<Integer>> ds2 = env.fromElements(2, 4).connectWith(ds1);
 
 		DataStream<Tuple1<String>> ds3 = env.fromElements("a", "b");
-		
+
 		@SuppressWarnings({ "unused", "unchecked" })
-		DataStream<Tuple1<Boolean>> ds4 = env.fromElements("c").connectWith(ds3).coMapWith(new MyCoMap(),
+		DataStream<Tuple1<Boolean>> ds4 = env.fromElements("c").connectWith(ds3)
+				.coMapWith(new MyCoMap(),
 
-		ds2).addSink(new EmptySink());
+				ds2).addSink(new EmptySink());
 
 		env.executeTest(32);
 		Assert.assertArrayEquals(expected.toArray(), result.toArray());