You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/03/02 15:38:11 UTC
flink git commit: [FLINK-5945] [core] Close function in
OuterJoinOperatorBase#executeOnCollections
Repository: flink
Updated Branches:
refs/heads/master ff552b440 -> 243ef69bf
[FLINK-5945] [core] Close function in OuterJoinOperatorBase#executeOnCollections
Conclude OuterJoinOperatorBase#executeOnCollections with a call to
FunctionUtils.closeFunction(function) in order to close rich user
functions.
This closes #3453
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/243ef69b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/243ef69b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/243ef69b
Branch: refs/heads/master
Commit: 243ef69bf5233998dd7f849721cfcb83669b663c
Parents: ff552b4
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Mar 1 15:55:48 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Mar 2 10:38:00 2017 -0500
----------------------------------------------------------------------
.../operators/base/OuterJoinOperatorBase.java | 3 +-
.../base/OuterJoinOperatorBaseTest.java | 80 ++++++++++++++++----
.../graph/library/link_analysis/HITSTest.java | 2 +-
3 files changed, 69 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/243ef69b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
index a47a612..003fdca 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
@@ -103,7 +103,6 @@ public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN
FunctionUtils.setFunctionRuntimeContext(function, runtimeContext);
FunctionUtils.openFunction(function, this.parameters);
-
List<OUT> result = new ArrayList<>();
Collector<OUT> collector = new CopyingListCollector<>(result, outInformation.createSerializer(executionConfig));
@@ -113,6 +112,8 @@ public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN
function.join(left == null ? null : leftSerializer.copy(left), right == null ? null : rightSerializer.copy(right), collector);
}
+ FunctionUtils.closeFunction(function);
+
return result;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/243ef69b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
index 683e164..9b6c684 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
@@ -19,34 +19,63 @@
package org.apache.flink.api.common.operators.base;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RichFlatJoinFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.Collector;
+import org.junit.Before;
import org.junit.Test;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
@SuppressWarnings("serial")
public class OuterJoinOperatorBaseTest implements Serializable {
- private final FlatJoinFunction<String, String, String> joiner = new FlatJoinFunction<String, String, String>() {
- @Override
- public void join(String first, String second, Collector<String> out) throws Exception {
- out.collect(String.valueOf(first) + ',' + String.valueOf(second));
- }
- };
+ private MockRichFlatJoinFunction joiner;
+
+ private OuterJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>> baseOperator;
+
+ private ExecutionConfig executionConfig;
+
+ private RuntimeContext runtimeContext;
@SuppressWarnings({"rawtypes", "unchecked"})
- private final OuterJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>> baseOperator =
+ @Before
+ public void setup() {
+ joiner = new MockRichFlatJoinFunction();
+
+ baseOperator =
new OuterJoinOperatorBase(joiner,
- new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null);
+ new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null);
+
+ executionConfig = new ExecutionConfig();
+
+ String taskName = "Test rich outer join function";
+ TaskInfo taskInfo = new TaskInfo(taskName, 1, 0, 1, 0);
+ HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<>();
+ HashMap<String, Future<Path>> cpTasks = new HashMap<>();
+
+ runtimeContext = new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks,
+ accumulatorMap, new UnregisteredMetricsGroup());
+ }
@Test
public void testFullOuterJoinWithoutMatchingPartners() throws Exception {
@@ -132,18 +161,41 @@ public class OuterJoinOperatorBaseTest implements Serializable {
baseOperator.setOuterJoinType(null);
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.disableObjectReuse();
- baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+ baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, executionConfig);
}
private void testOuterJoin(List<String> leftInput, List<String> rightInput, List<String> expected) throws Exception {
- ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.disableObjectReuse();
- List<String> resultSafe = baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+ List<String> resultSafe = baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, executionConfig);
executionConfig.enableObjectReuse();
- List<String> resultRegular = baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+ List<String> resultRegular = baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, executionConfig);
assertEquals(expected, resultSafe);
assertEquals(expected, resultRegular);
+
+ assertTrue(joiner.opened.get());
+ assertTrue(joiner.closed.get());
}
-}
\ No newline at end of file
+ private static class MockRichFlatJoinFunction extends RichFlatJoinFunction<String, String, String> {
+ final AtomicBoolean opened = new AtomicBoolean(false);
+ final AtomicBoolean closed = new AtomicBoolean(false);
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ opened.compareAndSet(false, true);
+ assertEquals(0, getRuntimeContext().getIndexOfThisSubtask());
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void close() throws Exception{
+ closed.compareAndSet(false, true);
+ }
+
+ @Override
+ public void join(String first, String second, Collector<String> out) throws Exception {
+ out.collect(String.valueOf(first) + ',' + String.valueOf(second));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/243ef69b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
index 2e5cebe..b09f95c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
@@ -80,7 +80,7 @@ extends AsmTestBase {
public void testWithRMatGraph()
throws Exception {
DataSet<Result<LongValue>> hits = directedRMatGraph
- .run(new HITS<LongValue, NullValue, NullValue>(0.000001));
+ .run(new HITS<LongValue, NullValue, NullValue>(1));
Checksum checksum = new ChecksumHashCode<Result<LongValue>>()
.run(hits)