You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/04/26 16:00:23 UTC

flink git commit: [FLINK-3777] Add openIF/closeIF methods to IF lifecycle

Repository: flink
Updated Branches:
  refs/heads/master 879bb1bb0 -> ac2137cfa


[FLINK-3777] Add openIF/closeIF methods to IF lifecycle

This closes #1903


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac2137cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac2137cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac2137cf

Branch: refs/heads/master
Commit: ac2137cfa5e63bd4f53a4b7669dc591ab210093f
Parents: 879bb1b
Author: Flavio Pompermaier <f....@gmail.com>
Authored: Tue Apr 26 15:42:48 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 26 16:00:20 2016 +0200

----------------------------------------------------------------------
 .../api/common/io/ReplicatingInputFormat.java   | 20 ++++++++++++--
 .../flink/api/common/io/RichInputFormat.java    | 20 ++++++++++++++
 .../common/operators/GenericDataSourceBase.java | 10 ++++++-
 .../api/common/io/FileInputFormatTest.java      |  3 ++
 .../operators/GenericDataSourceBaseTest.java    |  8 ++++++
 .../operators/util/TestRichInputFormat.java     | 24 +++++++++++++++-
 .../flink/runtime/operators/DataSourceTask.java | 11 +++++++-
 .../runtime/operators/DataSourceTaskTest.java   | 29 +++++++++++++++++++-
 8 files changed, 118 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
index a084f64..14dc8f4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
@@ -119,18 +119,32 @@ public final class ReplicatingInputFormat<OT, S extends InputSplit> extends Rich
 	}
 
 	@Override
-	public void setRuntimeContext(RuntimeContext context){
-		if(this.replicatedIF instanceof RichInputFormat){
+	public void setRuntimeContext(RuntimeContext context) {
+		if (this.replicatedIF instanceof RichInputFormat) {
 			((RichInputFormat)this.replicatedIF).setRuntimeContext(context);
 		}
 	}
 
 	@Override
 	public RuntimeContext getRuntimeContext(){
-		if(this.replicatedIF instanceof RichInputFormat){
+		if (this.replicatedIF instanceof RichInputFormat) {
 			return ((RichInputFormat)this.replicatedIF).getRuntimeContext();
 		} else{
 			throw new RuntimeException("The underlying input format to this ReplicatingInputFormat isn't context aware");
 		}
 	}
+
+	@Override
+	public void openInputFormat() {
+		if (this.replicatedIF instanceof RichInputFormat) {
+			((RichInputFormat)this.replicatedIF).openInputFormat();
+		}
+	}
+
+	@Override
+	public void closeInputFormat() {
+		if (this.replicatedIF instanceof RichInputFormat) {
+			((RichInputFormat)this.replicatedIF).closeInputFormat();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java
index 0c23e13..188be50 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java
@@ -49,4 +49,24 @@ public abstract class RichInputFormat<OT, T extends InputSplit> implements Input
 			"it in one of the other life cycle methods.");
 		}
 	}
+
+	/**
+	 * Opens this InputFormat instance. This method is called once per parallel instance.
+	 * Resources should be allocated in this method. (e.g. database connections, cache, etc.)
+	 * 
+	 * @see InputFormat
+	 */
+	public void openInputFormat() {
+		//do nothing here, just for subclasses
+	}
+
+	/**
+	 * Closes this InputFormat instance. This method is called once per parallel instance.
+	 * Resources allocated during {@link #openInputFormat()} should be closed in this method.
+	 * 
+	 * @see InputFormat
+	 */
+	public void closeInputFormat() {
+		//do nothing here, just for subclasses
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
index 0794cc8..e80c99f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
@@ -210,10 +210,13 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O
 	protected List<OUT> executeOnCollections(RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
 		@SuppressWarnings("unchecked")
 		InputFormat<OUT, InputSplit> inputFormat = (InputFormat<OUT, InputSplit>) this.formatWrapper.getUserCodeObject();
+		//configure the input format
 		inputFormat.configure(this.parameters);
 
-		if(inputFormat instanceof RichInputFormat){
+		//open the input format
+		if (inputFormat instanceof RichInputFormat) {
 			((RichInputFormat) inputFormat).setRuntimeContext(ctx);
+			((RichInputFormat) inputFormat).openInputFormat();
 		}
 
 		List<OUT> result = new ArrayList<OUT>();
@@ -235,6 +238,11 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O
 			inputFormat.close();
 		}
 		
+		//close the input format
+		if (inputFormat instanceof RichInputFormat) {
+			((RichInputFormat) inputFormat).closeInputFormat();
+		}
+
 		return result;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 5aac540..63cb966 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -387,6 +387,7 @@ public class FileInputFormatTest {
 		inputFormat.setFilePath(tempFile.toURI().toString());
 
 		inputFormat.configure(config);
+		inputFormat.openInputFormat();
 
 		FileInputSplit[] inputSplits = inputFormat.createInputSplits(3);
 
@@ -399,6 +400,8 @@ public class FileInputFormatTest {
 				}
 			}
 		}
+
+		inputFormat.closeInputFormat();
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
index bda2fb6..083039a 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
@@ -81,11 +81,19 @@ public class GenericDataSourceBaseTest implements java.io.Serializable {
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
+			assertEquals(false, in.hasBeenClosed());
+			assertEquals(false, in.hasBeenOpened());
 			List<String> resultMutableSafe = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
+			assertEquals(true, in.hasBeenClosed());
+			assertEquals(true, in.hasBeenOpened());
 
 			in.reset();
 			executionConfig.enableObjectReuse();
+			assertEquals(false, in.hasBeenClosed());
+			assertEquals(false, in.hasBeenOpened());
 			List<String> resultRegular = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
+			assertEquals(true, in.hasBeenClosed());
+			assertEquals(true, in.hasBeenOpened());
 
 			assertEquals(asList(TestIOData.RICH_NAMES), resultMutableSafe);
 			assertEquals(asList(TestIOData.RICH_NAMES), resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java
index 0945391..0bcd018 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java
@@ -33,6 +33,8 @@ public class TestRichInputFormat extends GenericInputFormat<String> implements N
 	private static final int NUM = 5;
 	private static final String[] NAMES = TestIOData.NAMES;
 	private int count = 0;
+	private boolean openCalled = false;
+	private boolean closeCalled = false;
 
 	@Override
 	public boolean reachedEnd() throws IOException {
@@ -46,7 +48,27 @@ public class TestRichInputFormat extends GenericInputFormat<String> implements N
 				getRuntimeContext().getNumberOfParallelSubtasks();
 	}
 
-	public void reset(){
+	public void reset() {
 		count = 0;
+		openCalled = false;
+		closeCalled = false;
+	}
+
+	@Override
+	public void openInputFormat() {
+		openCalled = true;
+	}
+
+	@Override
+	public void closeInputFormat() {
+		closeCalled = true;
+	}
+
+	public boolean hasBeenOpened() {
+		return openCalled;
+	}
+
+	public boolean hasBeenClosed() {
+		return closeCalled;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 5eec40f..0c525ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -97,9 +97,11 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 		// --------------------------------------------------------------------
 		LOG.debug(getLogString("Starting data source operator"));
 
-		if(RichInputFormat.class.isAssignableFrom(this.format.getClass())){
+		if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
 			((RichInputFormat) this.format).setRuntimeContext(createRuntimeContext());
 			LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
+			((RichInputFormat) this.format).openInputFormat();
+			LOG.debug(getLogString("Rich Source detected. Opening the InputFormat."));
 		}
 
 		ExecutionConfig executionConfig = getExecutionConfig();
@@ -192,6 +194,13 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 			}
 		} finally {
 			BatchTask.clearWriters(eventualOutputs);
+			// --------------------------------------------------------------------
+			// Closing
+			// --------------------------------------------------------------------
+			if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
+				((RichInputFormat) this.format).closeInputFormat();
+				LOG.debug(getLogString("Rich Source detected. Closing the InputFormat."));
+			}
 		}
 
 		if (!this.taskCanceled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 96ae700..8f0642e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -23,6 +23,7 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -94,7 +95,18 @@ public class DataSourceTaskTest extends TaskTestBase {
 			Assert.fail("Invoke method caused exception.");
 		}
 		
-		Assert.assertTrue("Invalid output size. Expected: "+(keyCnt*valCnt)+" Actual: "+this.outList.size(),
+		try {
+			Field formatField = DataSourceTask.class.getDeclaredField("format");
+			formatField.setAccessible(true);
+			MockInputFormat inputFormat = (MockInputFormat) formatField.get(testTask);
+			Assert.assertTrue("Invalid status of the input format. Expected for opened: true, Actual: " + inputFormat.opened, inputFormat.opened);
+			Assert.assertTrue("Invalid status of the input format. Expected for closed: true, Actual: " + inputFormat.closed, inputFormat.closed);
+		} catch (Exception e) {
+			System.err.println(e);
+			Assert.fail("Reflection error while trying to validate inputFormat status.");
+		}
+
+		Assert.assertTrue("Invalid output size. Expected: " + (keyCnt*valCnt) + " Actual: " + this.outList.size(),
 			this.outList.size() == keyCnt * valCnt);
 		
 		HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<>(keyCnt);
@@ -237,6 +249,9 @@ public class DataSourceTaskTest extends TaskTestBase {
 		
 		private final IntValue key = new IntValue();
 		private final IntValue value = new IntValue();
+
+		private boolean opened = false;
+		private boolean closed = false;
 		
 		@Override
 		public Record readRecord(Record target, byte[] record, int offset, int numBytes) {
@@ -255,6 +270,18 @@ public class DataSourceTaskTest extends TaskTestBase {
 			target.setField(1, this.value);
 			return target;
 		}
+
+		public void openInputFormat() {
+			//ensure this is called only once
+			Assert.assertFalse("Invalid status of the input format. Expected for opened: false, Actual: " + opened, opened);
+			opened = true;
+		}
+
+		public void closeInputFormat() {
+			//ensure this is called only once
+			Assert.assertFalse("Invalid status of the input format. Expected for closed: false, Actual: " + closed, closed);
+			closed = true;
+		}
 	}
 	
 	public static class MockDelayingInputFormat extends DelimitedInputFormat<Record> {