You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/04/26 16:00:23 UTC
flink git commit: [FLINK-3777] Add openIF/closeIF methods to IF
lifecycle
Repository: flink
Updated Branches:
refs/heads/master 879bb1bb0 -> ac2137cfa
[FLINK-3777] Add openIF/closeIF methods to IF lifecycle
This closes #1903
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac2137cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac2137cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac2137cf
Branch: refs/heads/master
Commit: ac2137cfa5e63bd4f53a4b7669dc591ab210093f
Parents: 879bb1b
Author: Flavio Pompermaier <f....@gmail.com>
Authored: Tue Apr 26 15:42:48 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 26 16:00:20 2016 +0200
----------------------------------------------------------------------
.../api/common/io/ReplicatingInputFormat.java | 20 ++++++++++++--
.../flink/api/common/io/RichInputFormat.java | 20 ++++++++++++++
.../common/operators/GenericDataSourceBase.java | 10 ++++++-
.../api/common/io/FileInputFormatTest.java | 3 ++
.../operators/GenericDataSourceBaseTest.java | 8 ++++++
.../operators/util/TestRichInputFormat.java | 24 +++++++++++++++-
.../flink/runtime/operators/DataSourceTask.java | 11 +++++++-
.../runtime/operators/DataSourceTaskTest.java | 29 +++++++++++++++++++-
8 files changed, 118 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
index a084f64..14dc8f4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
@@ -119,18 +119,32 @@ public final class ReplicatingInputFormat<OT, S extends InputSplit> extends Rich
}
@Override
- public void setRuntimeContext(RuntimeContext context){
- if(this.replicatedIF instanceof RichInputFormat){
+ public void setRuntimeContext(RuntimeContext context) {
+ if (this.replicatedIF instanceof RichInputFormat) {
((RichInputFormat)this.replicatedIF).setRuntimeContext(context);
}
}
@Override
public RuntimeContext getRuntimeContext(){
- if(this.replicatedIF instanceof RichInputFormat){
+ if (this.replicatedIF instanceof RichInputFormat) {
return ((RichInputFormat)this.replicatedIF).getRuntimeContext();
} else{
throw new RuntimeException("The underlying input format to this ReplicatingInputFormat isn't context aware");
}
}
+
+ @Override
+ public void openInputFormat() {
+ if (this.replicatedIF instanceof RichInputFormat) {
+ ((RichInputFormat)this.replicatedIF).openInputFormat();
+ }
+ }
+
+ @Override
+ public void closeInputFormat() {
+ if (this.replicatedIF instanceof RichInputFormat) {
+ ((RichInputFormat)this.replicatedIF).closeInputFormat();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java
index 0c23e13..188be50 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java
@@ -49,4 +49,24 @@ public abstract class RichInputFormat<OT, T extends InputSplit> implements Input
"it in one of the other life cycle methods.");
}
}
+
+ /**
+ * Opens this InputFormat instance. This method is called once per parallel instance.
+ * Resources should be allocated in this method. (e.g. database connections, cache, etc.)
+ *
+ * @see InputFormat
+ */
+ public void openInputFormat() {
+ //do nothing here, just for subclasses
+ }
+
+ /**
+ * Closes this InputFormat instance. This method is called once per parallel instance.
+ * Resources allocated during {@link #openInputFormat()} should be closed in this method.
+ *
+ * @see InputFormat
+ */
+ public void closeInputFormat() {
+ //do nothing here, just for subclasses
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
index 0794cc8..e80c99f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
@@ -210,10 +210,13 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O
protected List<OUT> executeOnCollections(RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
@SuppressWarnings("unchecked")
InputFormat<OUT, InputSplit> inputFormat = (InputFormat<OUT, InputSplit>) this.formatWrapper.getUserCodeObject();
+ //configure the input format
inputFormat.configure(this.parameters);
- if(inputFormat instanceof RichInputFormat){
+ //open the input format
+ if (inputFormat instanceof RichInputFormat) {
((RichInputFormat) inputFormat).setRuntimeContext(ctx);
+ ((RichInputFormat) inputFormat).openInputFormat();
}
List<OUT> result = new ArrayList<OUT>();
@@ -235,6 +238,11 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O
inputFormat.close();
}
+ //close the input format
+ if (inputFormat instanceof RichInputFormat) {
+ ((RichInputFormat) inputFormat).closeInputFormat();
+ }
+
return result;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 5aac540..63cb966 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -387,6 +387,7 @@ public class FileInputFormatTest {
inputFormat.setFilePath(tempFile.toURI().toString());
inputFormat.configure(config);
+ inputFormat.openInputFormat();
FileInputSplit[] inputSplits = inputFormat.createInputSplits(3);
@@ -399,6 +400,8 @@ public class FileInputFormatTest {
}
}
}
+
+ inputFormat.closeInputFormat();
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
index bda2fb6..083039a 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
@@ -81,11 +81,19 @@ public class GenericDataSourceBaseTest implements java.io.Serializable {
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.disableObjectReuse();
+ assertEquals(false, in.hasBeenClosed());
+ assertEquals(false, in.hasBeenOpened());
List<String> resultMutableSafe = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
+ assertEquals(true, in.hasBeenClosed());
+ assertEquals(true, in.hasBeenOpened());
in.reset();
executionConfig.enableObjectReuse();
+ assertEquals(false, in.hasBeenClosed());
+ assertEquals(false, in.hasBeenOpened());
List<String> resultRegular = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
+ assertEquals(true, in.hasBeenClosed());
+ assertEquals(true, in.hasBeenOpened());
assertEquals(asList(TestIOData.RICH_NAMES), resultMutableSafe);
assertEquals(asList(TestIOData.RICH_NAMES), resultRegular);
http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java
index 0945391..0bcd018 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java
@@ -33,6 +33,8 @@ public class TestRichInputFormat extends GenericInputFormat<String> implements N
private static final int NUM = 5;
private static final String[] NAMES = TestIOData.NAMES;
private int count = 0;
+ private boolean openCalled = false;
+ private boolean closeCalled = false;
@Override
public boolean reachedEnd() throws IOException {
@@ -46,7 +48,27 @@ public class TestRichInputFormat extends GenericInputFormat<String> implements N
getRuntimeContext().getNumberOfParallelSubtasks();
}
- public void reset(){
+ public void reset() {
count = 0;
+ openCalled = false;
+ closeCalled = false;
+ }
+
+ @Override
+ public void openInputFormat() {
+ openCalled = true;
+ }
+
+ @Override
+ public void closeInputFormat() {
+ closeCalled = true;
+ }
+
+ public boolean hasBeenOpened() {
+ return openCalled;
+ }
+
+ public boolean hasBeenClosed() {
+ return closeCalled;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 5eec40f..0c525ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -97,9 +97,11 @@ public class DataSourceTask<OT> extends AbstractInvokable {
// --------------------------------------------------------------------
LOG.debug(getLogString("Starting data source operator"));
- if(RichInputFormat.class.isAssignableFrom(this.format.getClass())){
+ if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
((RichInputFormat) this.format).setRuntimeContext(createRuntimeContext());
LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
+ ((RichInputFormat) this.format).openInputFormat();
+ LOG.debug(getLogString("Rich Source detected. Opening the InputFormat."));
}
ExecutionConfig executionConfig = getExecutionConfig();
@@ -192,6 +194,13 @@ public class DataSourceTask<OT> extends AbstractInvokable {
}
} finally {
BatchTask.clearWriters(eventualOutputs);
+ // --------------------------------------------------------------------
+ // Closing
+ // --------------------------------------------------------------------
+ if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
+ ((RichInputFormat) this.format).closeInputFormat();
+ LOG.debug(getLogString("Rich Source detected. Closing the InputFormat."));
+ }
}
if (!this.taskCanceled) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 96ae700..8f0642e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -23,6 +23,7 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -94,7 +95,18 @@ public class DataSourceTaskTest extends TaskTestBase {
Assert.fail("Invoke method caused exception.");
}
- Assert.assertTrue("Invalid output size. Expected: "+(keyCnt*valCnt)+" Actual: "+this.outList.size(),
+ try {
+ Field formatField = DataSourceTask.class.getDeclaredField("format");
+ formatField.setAccessible(true);
+ MockInputFormat inputFormat = (MockInputFormat) formatField.get(testTask);
+ Assert.assertTrue("Invalid status of the input format. Expected for opened: true, Actual: " + inputFormat.opened, inputFormat.opened);
+ Assert.assertTrue("Invalid status of the input format. Expected for closed: true, Actual: " + inputFormat.closed, inputFormat.closed);
+ } catch (Exception e) {
+ System.err.println(e);
+ Assert.fail("Reflection error while trying to validate inputFormat status.");
+ }
+
+ Assert.assertTrue("Invalid output size. Expected: " + (keyCnt*valCnt) + " Actual: " + this.outList.size(),
this.outList.size() == keyCnt * valCnt);
HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<>(keyCnt);
@@ -237,6 +249,9 @@ public class DataSourceTaskTest extends TaskTestBase {
private final IntValue key = new IntValue();
private final IntValue value = new IntValue();
+
+ private boolean opened = false;
+ private boolean closed = false;
@Override
public Record readRecord(Record target, byte[] record, int offset, int numBytes) {
@@ -255,6 +270,18 @@ public class DataSourceTaskTest extends TaskTestBase {
target.setField(1, this.value);
return target;
}
+
+ public void openInputFormat() {
+ //ensure this is called only once
+ Assert.assertFalse("Invalid status of the input format. Expected for opened: false, Actual: " + opened, opened);
+ opened = true;
+ }
+
+ public void closeInputFormat() {
+ //ensure this is called only once
+ Assert.assertFalse("Invalid status of the input format. Expected for closed: false, Actual: " + closed, closed);
+ closed = true;
+ }
}
public static class MockDelayingInputFormat extends DelimitedInputFormat<Record> {