You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/01 18:47:42 UTC
git commit: [FLINK-1070] Change return type of
"getBroadcastVariable()" to List.
Repository: incubator-flink
Updated Branches:
refs/heads/master eba8df082 -> 0b100517f
[FLINK-1070] Change return type of "getBroadcastVariable()" to List.
This closes #105
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/0b100517
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/0b100517
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/0b100517
Branch: refs/heads/master
Commit: 0b100517f1a4f2a74cab06d2fbf00f96a45513f8
Parents: eba8df0
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 29 19:32:40 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 1 18:38:12 2014 +0200
----------------------------------------------------------------------
.../flink/api/common/functions/MapFunction.java | 2 +-
.../flink/api/common/functions/RuntimeContext.java | 4 ++--
.../java/operators/translation/WrappingFunction.java | 14 ++++----------
.../runtime/operators/udf/RuntimeUDFContext.java | 12 ++++++------
4 files changed, 13 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b100517/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
index dccc980..b62e333 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
@@ -44,7 +44,7 @@ public interface MapFunction<T, O> extends Function, Serializable {
* it into exactly one element.
*
* @param value The input value.
- * @returns The transformed value
+ * @return The transformed value
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b100517/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index e18858b..7e2464b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -18,8 +18,8 @@
package org.apache.flink.api.common.functions;
-import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.DoubleCounter;
@@ -112,7 +112,7 @@ public interface RuntimeContext {
* Returns the result bound to the broadcast variable identified by the
* given {@code name}.
*/
- <RT> Collection<RT> getBroadcastVariable(String name);
+ <RT> List<RT> getBroadcastVariable(String name);
/**
* Returns the distributed cache to get the local tmp file.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b100517/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
index 748e3f3..b3ce5c0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@@ -19,8 +19,8 @@
package org.apache.flink.api.java.operators.translation;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.DoubleCounter;
@@ -137,15 +137,9 @@ public abstract class WrappingFunction<T extends Function> extends AbstractRichF
}
@Override
- public <RT> Collection<RT> getBroadcastVariable(String name) {
- Collection<RT> refColl = context.getBroadcastVariable(name);
-
- ArrayList<RT> list = new ArrayList<RT>(refColl.size());
- for (RT e : refColl) {
- list.add(e);
- }
-
- return list;
+ public <RT> List<RT> getBroadcastVariable(String name) {
+ List<RT> refColl = context.getBroadcastVariable(name);
+ return new ArrayList<RT>(refColl);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b100517/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java
index 27e0df2..6ff48f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.operators.udf;
-import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.FutureTask;
@@ -34,7 +34,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.core.fs.Path;
/**
- *
+ * Implementation of the {@link RuntimeContext}, created by runtime UDF operators.
*/
public class RuntimeUDFContext implements RuntimeContext {
@@ -48,7 +48,7 @@ public class RuntimeUDFContext implements RuntimeContext {
private HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
- private HashMap<String, Collection<?>> broadcastVars = new HashMap<String, Collection<?>>();
+ private HashMap<String, List<?>> broadcastVars = new HashMap<String, List<?>>();
public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex) {
this.name = name;
@@ -139,19 +139,19 @@ public class RuntimeUDFContext implements RuntimeContext {
return this.accumulators;
}
- public void setBroadcastVariable(String name, Collection<?> value) {
+ public void setBroadcastVariable(String name, List<?> value) {
this.broadcastVars.put(name, value);
}
@Override
@SuppressWarnings("unchecked")
- public <RT> Collection<RT> getBroadcastVariable(String name) {
+ public <RT> List<RT> getBroadcastVariable(String name) {
if (!this.broadcastVars.containsKey(name)) {
throw new IllegalArgumentException("Trying to access an unbound broadcast variable '"
+ name + "'.");
}
- return (Collection<RT>) this.broadcastVars.get(name);
+ return (List<RT>) this.broadcastVars.get(name);
}
@Override