You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/03/02 15:38:11 UTC

flink git commit: [FLINK-5945] [core] Close function in OuterJoinOperatorBase#executeOnCollections

Repository: flink
Updated Branches:
  refs/heads/master ff552b440 -> 243ef69bf


[FLINK-5945] [core] Close function in OuterJoinOperatorBase#executeOnCollections

Conclude OuterJoinOperatorBase#executeOnCollections with a call to
FunctionUtils.closeFunction(function) in order to close rich user
functions.

This closes #3453


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/243ef69b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/243ef69b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/243ef69b

Branch: refs/heads/master
Commit: 243ef69bf5233998dd7f849721cfcb83669b663c
Parents: ff552b4
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Mar 1 15:55:48 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Mar 2 10:38:00 2017 -0500

----------------------------------------------------------------------
 .../operators/base/OuterJoinOperatorBase.java   |  3 +-
 .../base/OuterJoinOperatorBaseTest.java         | 80 ++++++++++++++++----
 .../graph/library/link_analysis/HITSTest.java   |  2 +-
 3 files changed, 69 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/243ef69b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
index a47a612..003fdca 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
@@ -103,7 +103,6 @@ public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN
 		FunctionUtils.setFunctionRuntimeContext(function, runtimeContext);
 		FunctionUtils.openFunction(function, this.parameters);
 
-
 		List<OUT> result = new ArrayList<>();
 		Collector<OUT> collector = new CopyingListCollector<>(result, outInformation.createSerializer(executionConfig));
 
@@ -113,6 +112,8 @@ public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN
 			function.join(left == null ? null : leftSerializer.copy(left), right == null ? null : rightSerializer.copy(right), collector);
 		}
 
+		FunctionUtils.closeFunction(function);
+
 		return result;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/243ef69b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
index 683e164..9b6c684 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
@@ -19,34 +19,63 @@
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RichFlatJoinFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.util.Collector;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 @SuppressWarnings("serial")
 public class OuterJoinOperatorBaseTest implements Serializable {
 
-	private final FlatJoinFunction<String, String, String> joiner = new FlatJoinFunction<String, String, String>() {
-		@Override
-		public void join(String first, String second, Collector<String> out) throws Exception {
-			out.collect(String.valueOf(first) + ',' + String.valueOf(second));
-		}
-	};
+	private MockRichFlatJoinFunction joiner;
+
+	private OuterJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>> baseOperator;
+
+	private ExecutionConfig executionConfig;
+
+	private RuntimeContext runtimeContext;
 
 	@SuppressWarnings({"rawtypes", "unchecked"})
-	private final OuterJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>> baseOperator =
+	@Before
+	public void setup() {
+		joiner = new MockRichFlatJoinFunction();
+
+		baseOperator =
 			new OuterJoinOperatorBase(joiner,
-					new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
-							BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null);
+				new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
+					BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null);
+
+		executionConfig = new ExecutionConfig();
+
+		String taskName = "Test rich outer join function";
+		TaskInfo taskInfo = new TaskInfo(taskName, 1, 0, 1, 0);
+		HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<>();
+		HashMap<String, Future<Path>> cpTasks = new HashMap<>();
+
+		runtimeContext = new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks,
+			accumulatorMap, new UnregisteredMetricsGroup());
+	}
 
 	@Test
 	public void testFullOuterJoinWithoutMatchingPartners() throws Exception {
@@ -132,18 +161,41 @@ public class OuterJoinOperatorBaseTest implements Serializable {
 		baseOperator.setOuterJoinType(null);
 		ExecutionConfig executionConfig = new ExecutionConfig();
 		executionConfig.disableObjectReuse();
-		baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+		baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, executionConfig);
 	}
 
 	private void testOuterJoin(List<String> leftInput, List<String> rightInput, List<String> expected) throws Exception {
-		ExecutionConfig executionConfig = new ExecutionConfig();
 		executionConfig.disableObjectReuse();
-		List<String> resultSafe = baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+		List<String> resultSafe = baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, executionConfig);
 		executionConfig.enableObjectReuse();
-		List<String> resultRegular = baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+		List<String> resultRegular = baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, executionConfig);
 
 		assertEquals(expected, resultSafe);
 		assertEquals(expected, resultRegular);
+
+		assertTrue(joiner.opened.get());
+		assertTrue(joiner.closed.get());
 	}
 
-}
\ No newline at end of file
+	private static class MockRichFlatJoinFunction extends RichFlatJoinFunction<String, String, String> {
+		final AtomicBoolean opened = new AtomicBoolean(false);
+		final AtomicBoolean closed = new AtomicBoolean(false);
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			opened.compareAndSet(false, true);
+			assertEquals(0, getRuntimeContext().getIndexOfThisSubtask());
+			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+		}
+
+		@Override
+		public void close() throws Exception{
+			closed.compareAndSet(false, true);
+		}
+
+		@Override
+		public void join(String first, String second, Collector<String> out) throws Exception {
+			out.collect(String.valueOf(first) + ',' + String.valueOf(second));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/243ef69b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
index 2e5cebe..b09f95c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
@@ -80,7 +80,7 @@ extends AsmTestBase {
 	public void testWithRMatGraph()
 			throws Exception {
 		DataSet<Result<LongValue>> hits = directedRMatGraph
-			.run(new HITS<LongValue, NullValue, NullValue>(0.000001));
+			.run(new HITS<LongValue, NullValue, NullValue>(1));
 
 		Checksum checksum = new ChecksumHashCode<Result<LongValue>>()
 			.run(hits)