You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/01 18:47:42 UTC

git commit: [FLINK-1070] Change return type of "getBroadcastVariable()" to List.

Repository: incubator-flink
Updated Branches:
  refs/heads/master eba8df082 -> 0b100517f


[FLINK-1070] Change return type of "getBroadcastVariable()" to List.

This closes #105


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/0b100517
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/0b100517
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/0b100517

Branch: refs/heads/master
Commit: 0b100517f1a4f2a74cab06d2fbf00f96a45513f8
Parents: eba8df0
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 29 19:32:40 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 1 18:38:12 2014 +0200

----------------------------------------------------------------------
 .../flink/api/common/functions/MapFunction.java       |  2 +-
 .../flink/api/common/functions/RuntimeContext.java    |  4 ++--
 .../java/operators/translation/WrappingFunction.java  | 14 ++++----------
 .../runtime/operators/udf/RuntimeUDFContext.java      | 12 ++++++------
 4 files changed, 13 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b100517/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
index dccc980..b62e333 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
@@ -44,7 +44,7 @@ public interface MapFunction<T, O> extends Function, Serializable {
 	 * it into exactly one element.
 	 *
 	 * @param value The input value.
-	 * @returns  The transformed value
+	 * @return The transformed value
 	 *
 	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
 	 *                   to fail and may trigger recovery.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b100517/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index e18858b..7e2464b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.api.common.functions;
 
-import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.DoubleCounter;
@@ -112,7 +112,7 @@ public interface RuntimeContext {
 	 * Returns the result bound to the broadcast variable identified by the 
 	 * given {@code name}.
 	 */
-	<RT> Collection<RT> getBroadcastVariable(String name);
+	<RT> List<RT> getBroadcastVariable(String name);
 
 	/**
 	 * Returns the distributed cache to get the local tmp file.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b100517/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
index 748e3f3..b3ce5c0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@@ -19,8 +19,8 @@
 package org.apache.flink.api.java.operators.translation;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.DoubleCounter;
@@ -137,15 +137,9 @@ public abstract class WrappingFunction<T extends Function> extends AbstractRichF
 		}
 
 		@Override
-		public <RT> Collection<RT> getBroadcastVariable(String name) {
-			Collection<RT> refColl = context.getBroadcastVariable(name);
-			
-			ArrayList<RT> list = new ArrayList<RT>(refColl.size());
-			for (RT e : refColl) {
-				list.add(e);
-			}
-			
-			return list;
+		public <RT> List<RT> getBroadcastVariable(String name) {
+			List<RT> refColl = context.getBroadcastVariable(name);
+			return new ArrayList<RT>(refColl);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b100517/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java
index 27e0df2..6ff48f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.operators.udf;
 
-import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.FutureTask;
 
@@ -34,7 +34,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.core.fs.Path;
 
 /**
- *
+ * Implementation of the {@link RuntimeContext}, created by runtime UDF operators.
  */
 public class RuntimeUDFContext implements RuntimeContext {
 
@@ -48,7 +48,7 @@ public class RuntimeUDFContext implements RuntimeContext {
 
 	private HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
 
-	private HashMap<String, Collection<?>> broadcastVars = new HashMap<String, Collection<?>>();
+	private HashMap<String, List<?>> broadcastVars = new HashMap<String, List<?>>();
 
 	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex) {
 		this.name = name;
@@ -139,19 +139,19 @@ public class RuntimeUDFContext implements RuntimeContext {
 		return this.accumulators;
 	}
 
-	public void setBroadcastVariable(String name, Collection<?> value) {
+	public void setBroadcastVariable(String name, List<?> value) {
 		this.broadcastVars.put(name, value);
 	}
 
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public <RT> Collection<RT> getBroadcastVariable(String name) {
+	public <RT> List<RT> getBroadcastVariable(String name) {
 		if (!this.broadcastVars.containsKey(name)) {
 			throw new IllegalArgumentException("Trying to access an unbound broadcast variable '" 
 					+ name + "'.");
 		}
-		return (Collection<RT>) this.broadcastVars.get(name);
+		return (List<RT>) this.broadcastVars.get(name);
 	}
 
 	@Override