You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2016/06/14 15:00:13 UTC

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/2100

    [FLINK-4024] FileSourceFunction not adjusted to new IF lifecycle

    Refactors the FileSourceFunction to respect the lifecycle of a RichInputFormat.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink FLINK-4024

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2100.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2100
    
----
commit 987b3851704a5a20da08bd92e676d1c2c529c532
Author: kl0u <kk...@gmail.com>
Date:   2016-06-13T09:55:56Z

    [FLINK-4024] FileSourceFunction not adjusted to new IF lifecycle
    
    Refactors the FileSourceFunction to respect the
    lifecycle of a RichInputFormat.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67003727
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSourceFunctionTest.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.api.common.io.RichInputFormat;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.InputSplit;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.MockEnvironment;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +
    +public class FileSourceFunctionTest {
    +
    +	@Test
    +	public void testNormalOp() throws Exception {
    +		testForEmptyLocation(false);
    +	}
    +
    +	@Test
    +	public void testCancelation() throws Exception {
    +		testForEmptyLocation(true);
    +	}
    +
    +	private void testForEmptyLocation(final boolean midCancel) throws Exception {
    +
    +		final int noOfSplits = 5;
    +		final int cancelAt = 2;
    +
    +		final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
    +		final FileSourceFunction<Integer> reader = new FileSourceFunction<>(format, TypeInformation.of(Integer.class));
    +		reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits));
    +
    +		Assert.assertTrue(!format.isConfigured);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +		Assert.assertTrue(!format.isSplitOpen);
    +
    +		reader.open(new Configuration());
    +		Assert.assertTrue(format.isConfigured);
    +
    +		TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, cancelAt);
    +		reader.run(ctx);
    +
    +		int splitsSeen = ctx.getSplitsSeen();
    +		Assert.assertTrue(midCancel ? splitsSeen == cancelAt : splitsSeen == noOfSplits);
    +
    +		// we have exhausted the splits so the
    +		// format and splits should be closed by now
    +
    +		Assert.assertTrue(!format.isSplitOpen);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +	}
    +
    +
    +	private static class LifeCycleTestInputFormat extends RichInputFormat<Integer,InputSplit> {
    +
    +		private boolean isConfigured = false;
    +		private boolean isInputFormatOpen = false;
    +		private boolean isSplitOpen = false;
    +
    +		// end of split
    +		private boolean eos = false;
    +
    +		private int splitCounter = 0;
    +
    +		@Override
    +		public void openInputFormat() {
    +			this.isInputFormatOpen = true;
    +		}
    +
    +		@Override
    +		public void closeInputFormat() {
    +			this.isInputFormatOpen = false;
    +		}
    +
    +		@Override
    +		public void configure(Configuration parameters) {
    +			this.isConfigured = true;
    +		}
    +
    +		@Override
    +		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
    +			InputSplit[] splits = new InputSplit[minNumSplits];
    +
    +			for (int i = 0; i < minNumSplits; i++) {
    +				final int idx = i;
    +				splits[idx] = new InputSplit() {
    +					@Override
    +					public int getSplitNumber() {
    +						return idx;
    +					}
    +				};
    +			}
    +			return splits;
    +		}
    +
    +		@Override
    +		public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
    +			return null;
    +		}
    +
    +		@Override
    +		public void open(InputSplit split) throws IOException {
    +			// whenever a new split opens,
    +			// the previous should have been closed
    +			Assert.assertTrue(!isSplitOpen);
    +
    +			isSplitOpen = true;
    +			eos = false;
    +		}
    +
    +		@Override
    +		public boolean reachedEnd() throws IOException {
    +			return eos;
    +		}
    +
    +		@Override
    +		public Integer nextRecord(Integer reuse) throws IOException {
    +			eos = true;
    +			return splitCounter++;
    +		}
    +
    +		@Override
    +		public void close() throws IOException {
    +			this.isSplitOpen = false;
    --- End diff --
    
    Assert that split was opened before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2100
  
    Hi @kl0u, thanks for the PR! The implementation looks good. A few more checks could be added to the tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2100
  
    +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2100
  
    Merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2100
  
    Thanks @fhueske for the comments. 
    
    Now that also https://github.com/apache/flink/pull/2020 is merged I will also update that one to handle RichInputFormats.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67144983
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java ---
    @@ -0,0 +1,284 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.api.common.io.RichInputFormat;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.InputSplit;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.MockEnvironment;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +
    +public class InputFormatSourceFunctionTest {
    +
    +	@Test
    +	public void testNormalOp() throws Exception {
    +		testFormatLifecycle(false);
    +	}
    +
    +	@Test
    +	public void testCancelation() throws Exception {
    +		testFormatLifecycle(true);
    +	}
    +
    +	private void testFormatLifecycle(final boolean midCancel) throws Exception {
    +
    +		final int noOfSplits = 5;
    +		final int cancelAt = 2;
    +
    +		final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
    +		final InputFormatSourceFunction<Integer> reader = new InputFormatSourceFunction<>(format, TypeInformation.of(Integer.class));
    +		reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits));
    +
    +		Assert.assertTrue(!format.isConfigured);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +		Assert.assertTrue(!format.isSplitOpen);
    +
    +		reader.open(new Configuration());
    +		Assert.assertTrue(format.isConfigured);
    +
    +		TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, cancelAt);
    +		reader.run(ctx);
    +
    +		int splitsSeen = ctx.getSplitsSeen();
    +		Assert.assertTrue(midCancel ? splitsSeen == cancelAt : splitsSeen == noOfSplits);
    +
    +		// we have exhausted the splits so the
    +		// format and splits should be closed by now
    +
    +		Assert.assertTrue(!format.isSplitOpen);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +	}
    +
    +
    +	private static class LifeCycleTestInputFormat extends RichInputFormat<Integer,InputSplit> {
    +
    +		private boolean isConfigured = false;
    +		private boolean isInputFormatOpen = false;
    +		private boolean isSplitOpen = false;
    +
    +		// end of split
    +		private boolean eos = false;
    +
    +		private int splitCounter = 0;
    +
    +		private int reachedEndCalls = 0;
    +		private int nextRecordCalls = 0;
    +
    +		@Override
    +		public void openInputFormat() {
    +			Assert.assertTrue(!isInputFormatOpen);
    +			Assert.assertTrue(!isSplitOpen);
    +			this.isInputFormatOpen = true;
    +		}
    +
    +		@Override
    +		public void closeInputFormat() {
    +			Assert.assertTrue(!isSplitOpen);
    +			this.isInputFormatOpen = false;
    +		}
    +
    +		@Override
    +		public void configure(Configuration parameters) {
    +			this.isConfigured = true;
    --- End diff --
    
    Assert that IF was not configured before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67006934
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSourceFunctionTest.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.api.common.io.RichInputFormat;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.InputSplit;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.MockEnvironment;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +
    +public class FileSourceFunctionTest {
    +
    +	@Test
    +	public void testNormalOp() throws Exception {
    +		testForEmptyLocation(false);
    +	}
    +
    +	@Test
    +	public void testCancelation() throws Exception {
    +		testForEmptyLocation(true);
    +	}
    +
    +	private void testForEmptyLocation(final boolean midCancel) throws Exception {
    +
    +		final int noOfSplits = 5;
    +		final int cancelAt = 2;
    +
    +		final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
    +		final FileSourceFunction<Integer> reader = new FileSourceFunction<>(format, TypeInformation.of(Integer.class));
    +		reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits));
    +
    +		Assert.assertTrue(!format.isConfigured);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +		Assert.assertTrue(!format.isSplitOpen);
    +
    +		reader.open(new Configuration());
    +		Assert.assertTrue(format.isConfigured);
    +
    +		TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, cancelAt);
    +		reader.run(ctx);
    +
    +		int splitsSeen = ctx.getSplitsSeen();
    +		Assert.assertTrue(midCancel ? splitsSeen == cancelAt : splitsSeen == noOfSplits);
    +
    +		// we have exhausted the splits so the
    +		// format and splits should be closed by now
    +
    +		Assert.assertTrue(!format.isSplitOpen);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +	}
    +
    +
    +	private static class LifeCycleTestInputFormat extends RichInputFormat<Integer,InputSplit> {
    +
    +		private boolean isConfigured = false;
    +		private boolean isInputFormatOpen = false;
    +		private boolean isSplitOpen = false;
    +
    +		// end of split
    +		private boolean eos = false;
    +
    +		private int splitCounter = 0;
    +
    +		@Override
    +		public void openInputFormat() {
    +			this.isInputFormatOpen = true;
    +		}
    +
    +		@Override
    +		public void closeInputFormat() {
    +			this.isInputFormatOpen = false;
    --- End diff --
    
    Check that openInputFormat() was called before


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67003772
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSourceFunctionTest.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.api.common.io.RichInputFormat;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.InputSplit;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.MockEnvironment;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +
    +public class FileSourceFunctionTest {
    +
    +	@Test
    +	public void testNormalOp() throws Exception {
    +		testForEmptyLocation(false);
    +	}
    +
    +	@Test
    +	public void testCancelation() throws Exception {
    +		testForEmptyLocation(true);
    +	}
    +
    +	private void testForEmptyLocation(final boolean midCancel) throws Exception {
    +
    +		final int noOfSplits = 5;
    +		final int cancelAt = 2;
    +
    +		final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
    +		final FileSourceFunction<Integer> reader = new FileSourceFunction<>(format, TypeInformation.of(Integer.class));
    +		reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits));
    +
    +		Assert.assertTrue(!format.isConfigured);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +		Assert.assertTrue(!format.isSplitOpen);
    +
    +		reader.open(new Configuration());
    +		Assert.assertTrue(format.isConfigured);
    +
    +		TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, cancelAt);
    +		reader.run(ctx);
    +
    +		int splitsSeen = ctx.getSplitsSeen();
    +		Assert.assertTrue(midCancel ? splitsSeen == cancelAt : splitsSeen == noOfSplits);
    +
    +		// we have exhausted the splits so the
    +		// format and splits should be closed by now
    +
    +		Assert.assertTrue(!format.isSplitOpen);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +	}
    +
    +
    +	private static class LifeCycleTestInputFormat extends RichInputFormat<Integer,InputSplit> {
    +
    +		private boolean isConfigured = false;
    +		private boolean isInputFormatOpen = false;
    +		private boolean isSplitOpen = false;
    +
    +		// end of split
    +		private boolean eos = false;
    +
    +		private int splitCounter = 0;
    +
    +		@Override
    +		public void openInputFormat() {
    +			this.isInputFormatOpen = true;
    +		}
    +
    +		@Override
    +		public void closeInputFormat() {
    +			this.isInputFormatOpen = false;
    +		}
    +
    +		@Override
    +		public void configure(Configuration parameters) {
    +			this.isConfigured = true;
    +		}
    +
    +		@Override
    +		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
    +			InputSplit[] splits = new InputSplit[minNumSplits];
    +
    +			for (int i = 0; i < minNumSplits; i++) {
    +				final int idx = i;
    +				splits[idx] = new InputSplit() {
    +					@Override
    +					public int getSplitNumber() {
    +						return idx;
    +					}
    +				};
    +			}
    +			return splits;
    +		}
    +
    +		@Override
    +		public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
    +			return null;
    +		}
    +
    +		@Override
    +		public void open(InputSplit split) throws IOException {
    +			// whenever a new split opens,
    +			// the previous should have been closed
    +			Assert.assertTrue(!isSplitOpen);
    +
    +			isSplitOpen = true;
    +			eos = false;
    +		}
    +
    +		@Override
    +		public boolean reachedEnd() throws IOException {
    +			return eos;
    +		}
    +
    +		@Override
    +		public Integer nextRecord(Integer reuse) throws IOException {
    +			eos = true;
    --- End diff --
    
    Assert that open() was called before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2100


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2100
  
    Done! Thanks @fhueske .
    Kostas


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67012706
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSourceFunctionTest.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.api.common.io.RichInputFormat;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.InputSplit;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.MockEnvironment;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +
    +public class FileSourceFunctionTest {
    +
    +	@Test
    +	public void testNormalOp() throws Exception {
    +		testForEmptyLocation(false);
    +	}
    +
    +	@Test
    +	public void testCancelation() throws Exception {
    +		testForEmptyLocation(true);
    +	}
    +
    +	private void testForEmptyLocation(final boolean midCancel) throws Exception {
    +
    +		final int noOfSplits = 5;
    +		final int cancelAt = 2;
    +
    +		final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
    +		final FileSourceFunction<Integer> reader = new FileSourceFunction<>(format, TypeInformation.of(Integer.class));
    +		reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits));
    +
    +		Assert.assertTrue(!format.isConfigured);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +		Assert.assertTrue(!format.isSplitOpen);
    +
    +		reader.open(new Configuration());
    +		Assert.assertTrue(format.isConfigured);
    +
    +		TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, cancelAt);
    +		reader.run(ctx);
    +
    +		int splitsSeen = ctx.getSplitsSeen();
    +		Assert.assertTrue(midCancel ? splitsSeen == cancelAt : splitsSeen == noOfSplits);
    +
    +		// we have exhausted the splits so the
    +		// format and splits should be closed by now
    +
    +		Assert.assertTrue(!format.isSplitOpen);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +	}
    +
    +
    +	private static class LifeCycleTestInputFormat extends RichInputFormat<Integer,InputSplit> {
    +
    +		private boolean isConfigured = false;
    +		private boolean isInputFormatOpen = false;
    +		private boolean isSplitOpen = false;
    +
    +		// end of split
    +		private boolean eos = false;
    +
    +		private int splitCounter = 0;
    +
    +		@Override
    +		public void openInputFormat() {
    +			this.isInputFormatOpen = true;
    +		}
    +
    +		@Override
    +		public void closeInputFormat() {
    +			this.isInputFormatOpen = false;
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67144951
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java ---
    @@ -0,0 +1,284 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.api.common.io.RichInputFormat;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.InputSplit;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.MockEnvironment;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +
    +public class InputFormatSourceFunctionTest {
    +
    +	@Test
    +	public void testNormalOp() throws Exception {
    +		testFormatLifecycle(false);
    +	}
    +
    +	@Test
    +	public void testCancelation() throws Exception {
    +		testFormatLifecycle(true);
    +	}
    +
    +	private void testFormatLifecycle(final boolean midCancel) throws Exception {
    +
    +		final int noOfSplits = 5;
    +		final int cancelAt = 2;
    +
    +		final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
    +		final InputFormatSourceFunction<Integer> reader = new InputFormatSourceFunction<>(format, TypeInformation.of(Integer.class));
    +		reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits));
    +
    +		Assert.assertTrue(!format.isConfigured);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +		Assert.assertTrue(!format.isSplitOpen);
    +
    +		reader.open(new Configuration());
    +		Assert.assertTrue(format.isConfigured);
    +
    +		TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, cancelAt);
    +		reader.run(ctx);
    +
    +		int splitsSeen = ctx.getSplitsSeen();
    +		Assert.assertTrue(midCancel ? splitsSeen == cancelAt : splitsSeen == noOfSplits);
    +
    +		// we have exhausted the splits so the
    +		// format and splits should be closed by now
    +
    +		Assert.assertTrue(!format.isSplitOpen);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +	}
    +
    +
    +	private static class LifeCycleTestInputFormat extends RichInputFormat<Integer,InputSplit> {
    +
    +		private boolean isConfigured = false;
    +		private boolean isInputFormatOpen = false;
    +		private boolean isSplitOpen = false;
    +
    +		// end of split
    +		private boolean eos = false;
    +
    +		private int splitCounter = 0;
    +
    +		private int reachedEndCalls = 0;
    +		private int nextRecordCalls = 0;
    +
    +		@Override
    +		public void openInputFormat() {
    +			Assert.assertTrue(!isInputFormatOpen);
    --- End diff --
    
    Assert that IF was configured.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2100
  
    Good to merge after the `isFormatOpen` flag was removed.
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67142811
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---
    @@ -233,10 +236,20 @@ public void run() {
     				while (this.isRunning) {
     
     					synchronized (checkpointLock) {
    +
    +						if (!this.isFormatOpen) {
    +							this.format.openInputFormat();
    +							this.isFormatOpen = true;
    +						}
    +
     						if (this.currentSplit != null) {
     
     							if (currentSplit.equals(EOS)) {
     								isRunning = false;
    +								if (this.isFormatOpen) {
    --- End diff --
    
    is this necessary? Wouldn't we directly go into the `finally` clause because of the `break` where the IF is closed anyway?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67158378
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---
    @@ -173,6 +175,7 @@ public void close() throws Exception {
     	private class SplitReader<S extends Serializable, OT> extends Thread {
     
     		private volatile boolean isRunning;
    +		private volatile boolean isFormatOpen = false;
    --- End diff --
    
    I think this flag can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67003791
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSourceFunctionTest.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.api.common.io.RichInputFormat;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.InputSplit;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.MockEnvironment;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +
    +public class FileSourceFunctionTest {
    +
    +	@Test
    +	public void testNormalOp() throws Exception {
    +		testForEmptyLocation(false);
    +	}
    +
    +	@Test
    +	public void testCancelation() throws Exception {
    +		testForEmptyLocation(true);
    +	}
    +
    +	private void testForEmptyLocation(final boolean midCancel) throws Exception {
    +
    +		final int noOfSplits = 5;
    +		final int cancelAt = 2;
    +
    +		final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
    +		final FileSourceFunction<Integer> reader = new FileSourceFunction<>(format, TypeInformation.of(Integer.class));
    +		reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits));
    +
    +		Assert.assertTrue(!format.isConfigured);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +		Assert.assertTrue(!format.isSplitOpen);
    +
    +		reader.open(new Configuration());
    +		Assert.assertTrue(format.isConfigured);
    +
    +		TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, cancelAt);
    +		reader.run(ctx);
    +
    +		int splitsSeen = ctx.getSplitsSeen();
    +		Assert.assertTrue(midCancel ? splitsSeen == cancelAt : splitsSeen == noOfSplits);
    +
    +		// we have exhausted the splits so the
    +		// format and splits should be closed by now
    +
    +		Assert.assertTrue(!format.isSplitOpen);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +	}
    +
    +
    +	private static class LifeCycleTestInputFormat extends RichInputFormat<Integer,InputSplit> {
    +
    +		private boolean isConfigured = false;
    +		private boolean isInputFormatOpen = false;
    +		private boolean isSplitOpen = false;
    +
    +		// end of split
    +		private boolean eos = false;
    +
    +		private int splitCounter = 0;
    +
    +		@Override
    +		public void openInputFormat() {
    +			this.isInputFormatOpen = true;
    +		}
    +
    +		@Override
    +		public void closeInputFormat() {
    +			this.isInputFormatOpen = false;
    +		}
    +
    +		@Override
    +		public void configure(Configuration parameters) {
    +			this.isConfigured = true;
    +		}
    +
    +		@Override
    +		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
    +			InputSplit[] splits = new InputSplit[minNumSplits];
    +
    +			for (int i = 0; i < minNumSplits; i++) {
    +				final int idx = i;
    +				splits[idx] = new InputSplit() {
    +					@Override
    +					public int getSplitNumber() {
    +						return idx;
    +					}
    +				};
    +			}
    +			return splits;
    +		}
    +
    +		@Override
    +		public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
    +			return null;
    +		}
    +
    +		@Override
    +		public void open(InputSplit split) throws IOException {
    +			// whenever a new split opens,
    +			// the previous should have been closed
    +			Assert.assertTrue(!isSplitOpen);
    +
    +			isSplitOpen = true;
    +			eos = false;
    +		}
    +
    +		@Override
    +		public boolean reachedEnd() throws IOException {
    +			return eos;
    --- End diff --
    
    Assert that open was called before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2100
  
    Thanks @fhueske ! I integrated the comments. 
    Let me know if you have any more.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67142796
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---
    @@ -261,6 +274,10 @@ public void run() {
     
     							if (currentSplit.equals(EOS)) {
     								isRunning = false;
    +								if (this.isFormatOpen) {
    --- End diff --
    
    is this necessary? Wouldn't we directly go into the `finally` clause because of the `break` where the IF is closed anyway?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2100
  
    Thanks @fhueske !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67006854
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSourceFunctionTest.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.api.common.io.RichInputFormat;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.InputSplit;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.MockEnvironment;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +
    +public class FileSourceFunctionTest {
    +
    +	@Test
    +	public void testNormalOp() throws Exception {
    +		testForEmptyLocation(false);
    +	}
    +
    +	@Test
    +	public void testCancelation() throws Exception {
    +		testForEmptyLocation(true);
    +	}
    +
    +	private void testForEmptyLocation(final boolean midCancel) throws Exception {
    +
    +		final int noOfSplits = 5;
    +		final int cancelAt = 2;
    +
    +		final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
    +		final FileSourceFunction<Integer> reader = new FileSourceFunction<>(format, TypeInformation.of(Integer.class));
    +		reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits));
    +
    +		Assert.assertTrue(!format.isConfigured);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +		Assert.assertTrue(!format.isSplitOpen);
    +
    +		reader.open(new Configuration());
    +		Assert.assertTrue(format.isConfigured);
    +
    +		TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, cancelAt);
    +		reader.run(ctx);
    +
    +		int splitsSeen = ctx.getSplitsSeen();
    +		Assert.assertTrue(midCancel ? splitsSeen == cancelAt : splitsSeen == noOfSplits);
    +
    +		// we have exhausted the splits so the
    +		// format and splits should be closed by now
    +
    +		Assert.assertTrue(!format.isSplitOpen);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +	}
    +
    +
    +	private static class LifeCycleTestInputFormat extends RichInputFormat<Integer,InputSplit> {
    +
    +		private boolean isConfigured = false;
    +		private boolean isInputFormatOpen = false;
    +		private boolean isSplitOpen = false;
    +
    +		// end of split
    +		private boolean eos = false;
    +
    +		private int splitCounter = 0;
    +
    +		@Override
    +		public void openInputFormat() {
    +			this.isInputFormatOpen = true;
    +		}
    +
    +		@Override
    +		public void closeInputFormat() {
    +			this.isInputFormatOpen = false;
    +		}
    +
    +		@Override
    +		public void configure(Configuration parameters) {
    +			this.isConfigured = true;
    +		}
    +
    +		@Override
    +		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
    +			InputSplit[] splits = new InputSplit[minNumSplits];
    --- End diff --
    
    Check that configure was called before


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2100
  
    Hi @kl0u, thanks for the update. I added a few comments. After that the PR should be good. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67006905
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSourceFunctionTest.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.api.common.io.RichInputFormat;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.InputSplit;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.MockEnvironment;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +
    +public class FileSourceFunctionTest {
    +
    +	@Test
    +	public void testNormalOp() throws Exception {
    +		testForEmptyLocation(false);
    +	}
    +
    +	@Test
    +	public void testCancelation() throws Exception {
    +		testForEmptyLocation(true);
    +	}
    +
    +	private void testForEmptyLocation(final boolean midCancel) throws Exception {
    +
    +		final int noOfSplits = 5;
    +		final int cancelAt = 2;
    +
    +		final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
    +		final FileSourceFunction<Integer> reader = new FileSourceFunction<>(format, TypeInformation.of(Integer.class));
    +		reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits));
    +
    +		Assert.assertTrue(!format.isConfigured);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +		Assert.assertTrue(!format.isSplitOpen);
    +
    +		reader.open(new Configuration());
    +		Assert.assertTrue(format.isConfigured);
    +
    +		TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, cancelAt);
    +		reader.run(ctx);
    +
    +		int splitsSeen = ctx.getSplitsSeen();
    +		Assert.assertTrue(midCancel ? splitsSeen == cancelAt : splitsSeen == noOfSplits);
    +
    +		// we have exhausted the splits so the
    +		// format and splits should be closed by now
    +
    +		Assert.assertTrue(!format.isSplitOpen);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +	}
    +
    +
    +	private static class LifeCycleTestInputFormat extends RichInputFormat<Integer,InputSplit> {
    +
    +		private boolean isConfigured = false;
    +		private boolean isInputFormatOpen = false;
    +		private boolean isSplitOpen = false;
    +
    +		// end of split
    +		private boolean eos = false;
    +
    +		private int splitCounter = 0;
    +
    +		@Override
    +		public void openInputFormat() {
    +			this.isInputFormatOpen = true;
    +		}
    +
    +		@Override
    +		public void closeInputFormat() {
    +			this.isInputFormatOpen = false;
    +		}
    +
    +		@Override
    +		public void configure(Configuration parameters) {
    +			this.isConfigured = true;
    +		}
    +
    +		@Override
    +		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
    +			InputSplit[] splits = new InputSplit[minNumSplits];
    +
    +			for (int i = 0; i < minNumSplits; i++) {
    +				final int idx = i;
    +				splits[idx] = new InputSplit() {
    +					@Override
    +					public int getSplitNumber() {
    +						return idx;
    +					}
    +				};
    +			}
    +			return splits;
    +		}
    +
    +		@Override
    +		public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
    +			return null;
    +		}
    +
    +		@Override
    +		public void open(InputSplit split) throws IOException {
    +			// whenever a new split opens,
    +			// the previous should have been closed
    +			Assert.assertTrue(!isSplitOpen);
    +
    +			isSplitOpen = true;
    +			eos = false;
    +		}
    +
    +		@Override
    +		public boolean reachedEnd() throws IOException {
    +			return eos;
    +		}
    +
    +		@Override
    +		public Integer nextRecord(Integer reuse) throws IOException {
    +			eos = true;
    +			return splitCounter++;
    +		}
    +
    +		@Override
    +		public void close() throws IOException {
    +			this.isSplitOpen = false;
    --- End diff --
    
    The problem with this is that the close() in the FileSourceFunction is called also in the finally{} block, which can be redundant in the cases that the split was already closed. This is why I do not
    check it here, but I check that whenever you call open(), the format was not opened before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67002668
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java ---
    @@ -59,15 +60,64 @@ public void open(Configuration parameters) throws Exception {
     		serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
     
     		splitIterator = getInputSplits();
    -		if (splitIterator.hasNext()) {
    -			format.open(splitIterator.next());
    +		isRunning = splitIterator.hasNext();
    +	}
    +
    +	@Override
    +	public void run(SourceContext<OUT> ctx) throws Exception {
    +		try {
    +			if (isRunning && format instanceof RichInputFormat) {
    +				RichInputFormat richFormat = (RichInputFormat) format;
    +				richFormat.setRuntimeContext(getRuntimeContext());
    +				richFormat.openInputFormat();
    +			}
    +
    +			while (isRunning) {
    +
    +				format.open(splitIterator.next());
    +				OUT nextElement = serializer.createInstance();
    --- End diff --
    
    Can be moved out of the `while (isRunning)` loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67006831
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSourceFunctionTest.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.api.common.io.RichInputFormat;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.InputSplit;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.MockEnvironment;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +
    +public class FileSourceFunctionTest {
    +
    +	@Test
    +	public void testNormalOp() throws Exception {
    +		testForEmptyLocation(false);
    +	}
    +
    +	@Test
    +	public void testCancelation() throws Exception {
    +		testForEmptyLocation(true);
    +	}
    +
    +	private void testForEmptyLocation(final boolean midCancel) throws Exception {
    +
    +		final int noOfSplits = 5;
    +		final int cancelAt = 2;
    +
    +		final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
    +		final FileSourceFunction<Integer> reader = new FileSourceFunction<>(format, TypeInformation.of(Integer.class));
    +		reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits));
    +
    +		Assert.assertTrue(!format.isConfigured);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +		Assert.assertTrue(!format.isSplitOpen);
    +
    +		reader.open(new Configuration());
    +		Assert.assertTrue(format.isConfigured);
    +
    +		TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, cancelAt);
    +		reader.run(ctx);
    +
    +		int splitsSeen = ctx.getSplitsSeen();
    +		Assert.assertTrue(midCancel ? splitsSeen == cancelAt : splitsSeen == noOfSplits);
    +
    +		// we have exhausted the splits so the
    +		// format and splits should be closed by now
    +
    +		Assert.assertTrue(!format.isSplitOpen);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +	}
    +
    +
    +	private static class LifeCycleTestInputFormat extends RichInputFormat<Integer,InputSplit> {
    +
    +		private boolean isConfigured = false;
    +		private boolean isInputFormatOpen = false;
    +		private boolean isSplitOpen = false;
    +
    +		// end of split
    +		private boolean eos = false;
    +
    +		private int splitCounter = 0;
    +
    +		@Override
    +		public void openInputFormat() {
    +			this.isInputFormatOpen = true;
    +		}
    +
    +		@Override
    +		public void closeInputFormat() {
    +			this.isInputFormatOpen = false;
    +		}
    +
    +		@Override
    +		public void configure(Configuration parameters) {
    +			this.isConfigured = true;
    +		}
    +
    +		@Override
    +		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
    +			InputSplit[] splits = new InputSplit[minNumSplits];
    +
    +			for (int i = 0; i < minNumSplits; i++) {
    +				final int idx = i;
    +				splits[idx] = new InputSplit() {
    +					@Override
    +					public int getSplitNumber() {
    +						return idx;
    +					}
    +				};
    +			}
    +			return splits;
    +		}
    +
    +		@Override
    +		public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
    +			return null;
    +		}
    +
    +		@Override
    +		public void open(InputSplit split) throws IOException {
    +			// whenever a new split opens,
    +			// the previous should have been closed
    +			Assert.assertTrue(!isSplitOpen);
    --- End diff --
    
    check that configure() was called before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67142523
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---
    @@ -233,10 +236,20 @@ public void run() {
     				while (this.isRunning) {
     
     					synchronized (checkpointLock) {
    +
    +						if (!this.isFormatOpen) {
    +							this.format.openInputFormat();
    --- End diff --
    
    if `run()` is only called once, you could move this out of the `while()` loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67008351
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSourceFunctionTest.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.api.common.io.RichInputFormat;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.InputSplit;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.MockEnvironment;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +
    +public class FileSourceFunctionTest {
    +
    +	@Test
    +	public void testNormalOp() throws Exception {
    +		testForEmptyLocation(false);
    +	}
    +
    +	@Test
    +	public void testCancelation() throws Exception {
    +		testForEmptyLocation(true);
    +	}
    +
    +	private void testForEmptyLocation(final boolean midCancel) throws Exception {
    +
    +		final int noOfSplits = 5;
    +		final int cancelAt = 2;
    +
    +		final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
    +		final FileSourceFunction<Integer> reader = new FileSourceFunction<>(format, TypeInformation.of(Integer.class));
    +		reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits));
    +
    +		Assert.assertTrue(!format.isConfigured);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +		Assert.assertTrue(!format.isSplitOpen);
    +
    +		reader.open(new Configuration());
    +		Assert.assertTrue(format.isConfigured);
    +
    +		TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, cancelAt);
    +		reader.run(ctx);
    +
    +		int splitsSeen = ctx.getSplitsSeen();
    +		Assert.assertTrue(midCancel ? splitsSeen == cancelAt : splitsSeen == noOfSplits);
    +
    +		// we have exhausted the splits so the
    +		// format and splits should be closed by now
    +
    +		Assert.assertTrue(!format.isSplitOpen);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +	}
    +
    +
    +	private static class LifeCycleTestInputFormat extends RichInputFormat<Integer,InputSplit> {
    +
    +		private boolean isConfigured = false;
    +		private boolean isInputFormatOpen = false;
    +		private boolean isSplitOpen = false;
    +
    +		// end of split
    +		private boolean eos = false;
    +
    +		private int splitCounter = 0;
    +
    +		@Override
    +		public void openInputFormat() {
    +			this.isInputFormatOpen = true;
    +		}
    +
    +		@Override
    +		public void closeInputFormat() {
    +			this.isInputFormatOpen = false;
    --- End diff --
    
    Same for this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2100
  
    Hi @fhueske ! I integrated your comments and also updated the https://github.com/apache/flink/pull/2020. 
    
    Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67012682
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSourceFunctionTest.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.api.common.io.RichInputFormat;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.InputSplit;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.MockEnvironment;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +
    +public class FileSourceFunctionTest {
    +
    +	@Test
    +	public void testNormalOp() throws Exception {
    +		testForEmptyLocation(false);
    +	}
    +
    +	@Test
    +	public void testCancelation() throws Exception {
    +		testForEmptyLocation(true);
    +	}
    +
    +	private void testForEmptyLocation(final boolean midCancel) throws Exception {
    +
    +		final int noOfSplits = 5;
    +		final int cancelAt = 2;
    +
    +		final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
    +		final FileSourceFunction<Integer> reader = new FileSourceFunction<>(format, TypeInformation.of(Integer.class));
    +		reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits));
    +
    +		Assert.assertTrue(!format.isConfigured);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +		Assert.assertTrue(!format.isSplitOpen);
    +
    +		reader.open(new Configuration());
    +		Assert.assertTrue(format.isConfigured);
    +
    +		TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, cancelAt);
    +		reader.run(ctx);
    +
    +		int splitsSeen = ctx.getSplitsSeen();
    +		Assert.assertTrue(midCancel ? splitsSeen == cancelAt : splitsSeen == noOfSplits);
    +
    +		// we have exhausted the splits so the
    +		// format and splits should be closed by now
    +
    +		Assert.assertTrue(!format.isSplitOpen);
    +		Assert.assertTrue(!format.isInputFormatOpen);
    +	}
    +
    +
    +	private static class LifeCycleTestInputFormat extends RichInputFormat<Integer,InputSplit> {
    +
    +		private boolean isConfigured = false;
    +		private boolean isInputFormatOpen = false;
    +		private boolean isSplitOpen = false;
    +
    +		// end of split
    +		private boolean eos = false;
    +
    +		private int splitCounter = 0;
    +
    +		@Override
    +		public void openInputFormat() {
    +			this.isInputFormatOpen = true;
    +		}
    +
    +		@Override
    +		public void closeInputFormat() {
    +			this.isInputFormatOpen = false;
    +		}
    +
    +		@Override
    +		public void configure(Configuration parameters) {
    +			this.isConfigured = true;
    +		}
    +
    +		@Override
    +		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
    +			InputSplit[] splits = new InputSplit[minNumSplits];
    +
    +			for (int i = 0; i < minNumSplits; i++) {
    +				final int idx = i;
    +				splits[idx] = new InputSplit() {
    +					@Override
    +					public int getSplitNumber() {
    +						return idx;
    +					}
    +				};
    +			}
    +			return splits;
    +		}
    +
    +		@Override
    +		public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
    +			return null;
    +		}
    +
    +		@Override
    +		public void open(InputSplit split) throws IOException {
    +			// whenever a new split opens,
    +			// the previous should have been closed
    +			Assert.assertTrue(!isSplitOpen);
    +
    +			isSplitOpen = true;
    +			eos = false;
    +		}
    +
    +		@Override
    +		public boolean reachedEnd() throws IOException {
    +			return eos;
    +		}
    +
    +		@Override
    +		public Integer nextRecord(Integer reuse) throws IOException {
    +			eos = true;
    +			return splitCounter++;
    +		}
    +
    +		@Override
    +		public void close() throws IOException {
    +			this.isSplitOpen = false;
    --- End diff --
    
    OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2100#discussion_r67006059
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSourceFunctionTest.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.api.common.io.RichInputFormat;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.InputSplit;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.MockEnvironment;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +
    +public class FileSourceFunctionTest {
    +
    +	@Test
    +	public void testNormalOp() throws Exception {
    +		testForEmptyLocation(false);
    +	}
    +
    +	@Test
    +	public void testCancelation() throws Exception {
    +		testForEmptyLocation(true);
    +	}
    +
    +	private void testForEmptyLocation(final boolean midCancel) throws Exception {
    --- End diff --
    
    Why is this called `testForEmptyLocation`? Wouldn't `testInputFormatLifeCycle` be a better name?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---