You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2016/05/12 01:04:35 UTC

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/1984

    [FLINK-3889] Make File Monitoring Function checkpointable.

    This pull request introduces the underlying functionality to make Streaming File Sources persistent. 
    It does not yet change the API calls, as this will be done after agreeing on the current architecture and 
    implementation.
    
    In addition, this PR includes a commit for FLINK-3896. This allows an operator to cancel its container task. The need for this functionality came during a discussion with @StephanEwen and @aljoscha and it is a separate commit.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink ft_files

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1984.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1984
    
----
commit 7deb92236cec47ddcfbb3abfa396fd9d15f770b7
Author: kl0u <kk...@gmail.com>
Date:   2016-05-10T16:56:58Z

    [FLINK-3896] Allow a StreamTask to be Externally Cancelled
    
    It adds a method failExternally() to the StreamTask, so that custom Operators
    can make their containing task fail when needed.

commit c9682b7606451c4eecf6f2f6df9a498fb6d39577
Author: kl0u <kk...@gmail.com>
Date:   2016-04-10T14:56:42Z

    [FLINK-3717] Make FileInputFormat checkpointable
    
    This adds a new interface called CheckpointableInputFormat
    which describes input formats whose state is queryable,
    i.e. getCurrentChannelState() returns where the reader is
    in the underlying source, and they can resume reading from
    a user-specified position.
    
    This functionality is not yet leveraged by current readers.

commit cbbfd8d7e6db0f8f114675b4047aecb94996e500
Author: kl0u <kk...@gmail.com>
Date:   2016-04-18T14:37:54Z

    [FLINK-3889][FLINK-3808] Refactor File Monitoring Source
    
    This is meant to replace the different file
    reading sources in Flink streaming. Now there is
    one monitoring source with DOP 1 monitoring a
    directory and assigning input split to downstream
    readers.
    
    In addition, it makes the new features added by
    FLINK-3717 and FLINK-3808 work together. Now we have
    a file monitoring source that is also fault tolerant
    and can guarantee exactly once semantics.
    
    This does not replace the old API calls. This
    will be done in a future commit.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63495352
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java ---
    @@ -0,0 +1,368 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.io.CheckpointableInputFormat;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.operators.TimestampedCollector;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Queue;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}.
    + * This operator will receive just the split descriptors and then read and emit records. This may lead
    + * to backpressure. To avoid this, we will have another thread actually reading the splits and
    + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the
    + * current state.
    + * */
    +public class FileSplitReadOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
    +	implements OneInputStreamOperator<FileInputSplit, OUT> {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(FileSplitReadOperator.class);
    +
    +	private static final FileInputSplit EOF = new FileInputSplit(-1, null, -1, -1, null);
    +
    +	private transient SplitReader<S, OUT> reader;
    +	private transient TimestampedCollector<OUT> collector;
    +
    +	private Configuration configuration;
    +	private FileInputFormat<OUT> format;
    +	private TypeInformation<OUT> typeInfo;
    +
    +	private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
    +
    +	public FileSplitReadOperator(FileInputFormat<OUT> format, TypeInformation<OUT> typeInfo, Configuration configuration) {
    +		this.format = checkNotNull(format);
    +		this.typeInfo = checkNotNull(typeInfo);
    +		this.configuration = checkNotNull(configuration);
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		super.open();
    +
    +		this.format.configure(configuration);
    +		this.collector = new TimestampedCollector<>(output);
    +
    +		TypeSerializer<OUT> serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
    +		Object checkpointLock = getContainingTask().getCheckpointLock();
    +
    +		this.reader = new SplitReader<>(format, serializer, collector, checkpointLock);
    +		this.reader.setReaderState(this.readerState);
    +		this.reader.start();
    +		this.readerState = null;
    +	}
    +
    +	@Override
    +	public void processElement(StreamRecord<FileInputSplit> element) throws Exception {
    +		reader.addSplit(element.getValue());
    +	}
    +
    +	@Override
    +	public void processWatermark(Watermark mark) throws Exception {
    +		output.emitWatermark(mark);
    +	}
    +
    +	@Override
    +	public void dispose() {
    +		super.dispose();
    +
    +		// first try to cancel it properly and
    +		// give it some time until it finishes
    +		reader.cancel();
    +		try {
    +			reader.join(200);
    +		} catch (InterruptedException e) {
    +			// we can ignore this
    +		}
    +
    +		// if the above did not work, then interrupt the thread repeatedly
    +		while (reader.isAlive()) {
    +
    +			StringBuilder bld = new StringBuilder();
    +			StackTraceElement[] stack = reader.getStackTrace();
    +			for (StackTraceElement e : stack) {
    +				bld.append(e).append('\n');
    +			}
    +			LOG.warn("The reader is stuck in method:\n {}", bld.toString());
    +
    +			reader.interrupt();
    +			try {
    +				reader.join(50);
    +			} catch (InterruptedException e) {
    +				// we can ignore this
    +			}
    +		}
    +		reader = null;
    +		collector = null;
    +		configuration = null;
    +		format = null;
    +		typeInfo = null;
    +	}
    +
    +	@Override
    +	public void close() throws Exception {
    +		super.close();
    +
    +		// signal that no more splits will come, wait for the reader to finish
    +		// and close the collector. Further cleaning up is handled by the dispose().
    +
    +		if (reader != null && reader.isAlive() && reader.isRunning()) {
    +			// add a dummy element to signal that no more splits will
    +			// arrive and wait until the reader finishes
    +			reader.addSplit(EOF);
    +			reader.join();
    +		}
    +		collector.close();
    +	}
    +
    +	private class SplitReader<S extends Serializable, OT> extends Thread {
    +
    +		private volatile boolean isRunning;
    +
    +		private final FileInputFormat<OT> format;
    +		private final TypeSerializer<OT> serializer;
    +
    +		private final Object checkpointLock;
    +		private final TimestampedCollector<OT> collector;
    +
    +		private final Object lock = new Object();
    +
    +		private final Queue<FileInputSplit> pendingSplits;
    +
    +		SplitReader(FileInputFormat<OT> format,
    +					TypeSerializer<OT> serializer,
    +					TimestampedCollector<OT> collector,
    +					Object checkpointLock) {
    +
    +			this.format = checkNotNull(format, "Unspecified FileInputFormat.");
    +			this.serializer = checkNotNull(serializer, "Unspecified Serialized.");
    +
    +			this.pendingSplits = new LinkedList<>();
    +			this.collector = collector;
    +			this.checkpointLock = checkpointLock;
    +			this.isRunning = true;
    +		}
    +
    +		void addSplit(FileInputSplit split) {
    +			Preconditions.checkNotNull(split);
    +			synchronized (lock) {
    +				this.pendingSplits.add(split);
    +			}
    +		}
    +
    +		public boolean isRunning() {
    +			return this.isRunning;
    +		}
    +
    +		@Override
    +		public void run() {
    +			FileInputSplit split = null;
    +			try {
    +				while (this.isRunning) {
    +
    +					// get the next split to read.
    +					// locking is needed because checkpointing will
    +					// ask for a consistent snapshot of the list.
    +					synchronized (lock) {
    +						split = this.pendingSplits.peek();
    +					}
    +
    +					if (split == null) {
    +						Thread.sleep(50);
    +						continue;
    +					}
    +
    +					if (split.equals(EOF)) {
    +						isRunning = false;
    +						break;
    +					}
    +
    +					synchronized (checkpointLock) {
    +						synchronized (lock) {
    +							split = this.pendingSplits.poll();
    +						}
    +						this.format.open(split);
    +					}
    +
    +					try {
    +						OT nextElement = serializer.createInstance();
    +						do {
    +							synchronized (checkpointLock) {
    +								nextElement = format.nextRecord(nextElement);
    +								if (nextElement != null) {
    +									collector.collect(nextElement);
    +								}
    +							}
    +						} while (nextElement != null && !format.reachedEnd());
    +					} finally {
    +						this.format.close();
    +					}
    +				}
    +				LOG.info("Split Reader terminated, and exiting normally.");
    +
    +			} catch (Throwable e) {
    +				if (isRunning) {
    +					LOG.error("Caught exception processing split: ", split);
    +				}
    +				getContainingTask().failExternally(e);
    +			}
    +		}
    +
    +		Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException {
    +			// take a consistent snapshot of the pending splits
    +			// and that of the format, which includes the split currently being read and
    +			// where we are in the split.
    +
    +			List<FileInputSplit> snapshot;
    +			Tuple2<FileInputSplit, S> formatState = null;
    +			synchronized (lock) {
    +				snapshot = new ArrayList<>(this.pendingSplits.size());
    +				for (FileInputSplit split: this.pendingSplits) {
    +					snapshot.add(split);
    +				}
    +
    +				if (this.format instanceof CheckpointableInputFormat) {
    +					formatState = ((CheckpointableInputFormat) format).getCurrentChannelState();
    +				} else {
    +					LOG.warn("The format used is not checkpointable.");
    +				}
    +			}
    +
    +			return formatState == null ?
    +				new Tuple3<>(snapshot, (FileInputSplit) null, (S) null) :
    +				new Tuple3<>(snapshot, formatState.f0, formatState.f1);
    +		}
    +
    +		void setReaderState(Tuple3<List<FileInputSplit>, FileInputSplit, S> state) throws IOException {
    --- End diff --
    
    Can we have an additional constructor instead of this method. It is only ever called right after the constructor. This would make it "atomic".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63521733
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java ---
    @@ -0,0 +1,368 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.io.CheckpointableInputFormat;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.operators.TimestampedCollector;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Queue;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}.
    + * This operator will receive just the split descriptors and then read and emit records. This may lead
    + * to backpressure. To avoid this, we will have another thread actually reading the splits and
    + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the
    + * current state.
    + * */
    +public class FileSplitReadOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
    +	implements OneInputStreamOperator<FileInputSplit, OUT> {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(FileSplitReadOperator.class);
    +
    +	private static final FileInputSplit EOF = new FileInputSplit(-1, null, -1, -1, null);
    +
    +	private transient SplitReader<S, OUT> reader;
    +	private transient TimestampedCollector<OUT> collector;
    +
    +	private Configuration configuration;
    +	private FileInputFormat<OUT> format;
    +	private TypeInformation<OUT> typeInfo;
    +
    +	private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
    +
    +	public FileSplitReadOperator(FileInputFormat<OUT> format, TypeInformation<OUT> typeInfo, Configuration configuration) {
    +		this.format = checkNotNull(format);
    +		this.typeInfo = checkNotNull(typeInfo);
    +		this.configuration = checkNotNull(configuration);
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		super.open();
    +
    +		this.format.configure(configuration);
    +		this.collector = new TimestampedCollector<>(output);
    +
    +		TypeSerializer<OUT> serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
    +		Object checkpointLock = getContainingTask().getCheckpointLock();
    +
    +		this.reader = new SplitReader<>(format, serializer, collector, checkpointLock);
    +		this.reader.setReaderState(this.readerState);
    +		this.reader.start();
    +		this.readerState = null;
    +	}
    +
    +	@Override
    +	public void processElement(StreamRecord<FileInputSplit> element) throws Exception {
    +		reader.addSplit(element.getValue());
    +	}
    +
    +	@Override
    +	public void processWatermark(Watermark mark) throws Exception {
    +		output.emitWatermark(mark);
    +	}
    +
    +	@Override
    +	public void dispose() {
    +		super.dispose();
    +
    +		// first try to cancel it properly and
    +		// give it some time until it finishes
    +		reader.cancel();
    +		try {
    +			reader.join(200);
    +		} catch (InterruptedException e) {
    +			// we can ignore this
    +		}
    +
    +		// if the above did not work, then interrupt the thread repeatedly
    +		while (reader.isAlive()) {
    +
    +			StringBuilder bld = new StringBuilder();
    +			StackTraceElement[] stack = reader.getStackTrace();
    +			for (StackTraceElement e : stack) {
    +				bld.append(e).append('\n');
    +			}
    +			LOG.warn("The reader is stuck in method:\n {}", bld.toString());
    +
    +			reader.interrupt();
    +			try {
    +				reader.join(50);
    +			} catch (InterruptedException e) {
    +				// we can ignore this
    +			}
    +		}
    +		reader = null;
    +		collector = null;
    +		configuration = null;
    +		format = null;
    +		typeInfo = null;
    +	}
    +
    +	@Override
    +	public void close() throws Exception {
    +		super.close();
    +
    +		// signal that no more splits will come, wait for the reader to finish
    +		// and close the collector. Further cleaning up is handled by the dispose().
    +
    +		if (reader != null && reader.isAlive() && reader.isRunning()) {
    +			// add a dummy element to signal that no more splits will
    +			// arrive and wait until the reader finishes
    +			reader.addSplit(EOF);
    +			reader.join();
    +		}
    +		collector.close();
    +	}
    +
    +	private class SplitReader<S extends Serializable, OT> extends Thread {
    +
    +		private volatile boolean isRunning;
    +
    +		private final FileInputFormat<OT> format;
    +		private final TypeSerializer<OT> serializer;
    +
    +		private final Object checkpointLock;
    +		private final TimestampedCollector<OT> collector;
    +
    +		private final Object lock = new Object();
    +
    +		private final Queue<FileInputSplit> pendingSplits;
    +
    +		SplitReader(FileInputFormat<OT> format,
    +					TypeSerializer<OT> serializer,
    +					TimestampedCollector<OT> collector,
    +					Object checkpointLock) {
    +
    +			this.format = checkNotNull(format, "Unspecified FileInputFormat.");
    +			this.serializer = checkNotNull(serializer, "Unspecified Serialized.");
    +
    +			this.pendingSplits = new LinkedList<>();
    +			this.collector = collector;
    +			this.checkpointLock = checkpointLock;
    +			this.isRunning = true;
    +		}
    +
    +		void addSplit(FileInputSplit split) {
    +			Preconditions.checkNotNull(split);
    +			synchronized (lock) {
    +				this.pendingSplits.add(split);
    +			}
    +		}
    +
    +		public boolean isRunning() {
    +			return this.isRunning;
    +		}
    +
    +		@Override
    +		public void run() {
    +			FileInputSplit split = null;
    +			try {
    +				while (this.isRunning) {
    +
    +					// get the next split to read.
    +					// locking is needed because checkpointing will
    +					// ask for a consistent snapshot of the list.
    +					synchronized (lock) {
    --- End diff --
    
    It is true that we do not modify the queue here but, concurrently, the main thread may be adding elements. This is the reason I am taking the lock.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63523040
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java ---
    @@ -0,0 +1,368 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.io.CheckpointableInputFormat;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.operators.TimestampedCollector;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Queue;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}.
    + * This operator will receive just the split descriptors and then read and emit records. This may lead
    + * to backpressure. To avoid this, we will have another thread actually reading the splits and
    + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the
    + * current state.
    + * */
    +public class FileSplitReadOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
    +	implements OneInputStreamOperator<FileInputSplit, OUT> {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(FileSplitReadOperator.class);
    +
    +	private static final FileInputSplit EOF = new FileInputSplit(-1, null, -1, -1, null);
    +
    +	private transient SplitReader<S, OUT> reader;
    +	private transient TimestampedCollector<OUT> collector;
    +
    +	private Configuration configuration;
    +	private FileInputFormat<OUT> format;
    +	private TypeInformation<OUT> typeInfo;
    +
    +	private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
    +
    +	public FileSplitReadOperator(FileInputFormat<OUT> format, TypeInformation<OUT> typeInfo, Configuration configuration) {
    +		this.format = checkNotNull(format);
    +		this.typeInfo = checkNotNull(typeInfo);
    +		this.configuration = checkNotNull(configuration);
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		super.open();
    +
    +		this.format.configure(configuration);
    +		this.collector = new TimestampedCollector<>(output);
    +
    +		TypeSerializer<OUT> serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
    +		Object checkpointLock = getContainingTask().getCheckpointLock();
    +
    +		this.reader = new SplitReader<>(format, serializer, collector, checkpointLock);
    +		this.reader.setReaderState(this.readerState);
    +		this.reader.start();
    +		this.readerState = null;
    +	}
    +
    +	@Override
    +	public void processElement(StreamRecord<FileInputSplit> element) throws Exception {
    +		reader.addSplit(element.getValue());
    +	}
    +
    +	@Override
    +	public void processWatermark(Watermark mark) throws Exception {
    +		output.emitWatermark(mark);
    +	}
    +
    +	@Override
    +	public void dispose() {
    +		super.dispose();
    +
    +		// first try to cancel it properly and
    +		// give it some time until it finishes
    +		reader.cancel();
    +		try {
    +			reader.join(200);
    +		} catch (InterruptedException e) {
    +			// we can ignore this
    +		}
    +
    +		// if the above did not work, then interrupt the thread repeatedly
    +		while (reader.isAlive()) {
    +
    +			StringBuilder bld = new StringBuilder();
    +			StackTraceElement[] stack = reader.getStackTrace();
    +			for (StackTraceElement e : stack) {
    +				bld.append(e).append('\n');
    +			}
    +			LOG.warn("The reader is stuck in method:\n {}", bld.toString());
    +
    +			reader.interrupt();
    +			try {
    +				reader.join(50);
    +			} catch (InterruptedException e) {
    +				// we can ignore this
    +			}
    +		}
    +		reader = null;
    +		collector = null;
    +		configuration = null;
    +		format = null;
    +		typeInfo = null;
    +	}
    +
    +	@Override
    +	public void close() throws Exception {
    +		super.close();
    +
    +		// signal that no more splits will come, wait for the reader to finish
    +		// and close the collector. Further cleaning up is handled by the dispose().
    +
    +		if (reader != null && reader.isAlive() && reader.isRunning()) {
    +			// add a dummy element to signal that no more splits will
    +			// arrive and wait until the reader finishes
    +			reader.addSplit(EOF);
    +			reader.join();
    +		}
    +		collector.close();
    +	}
    +
    +	private class SplitReader<S extends Serializable, OT> extends Thread {
    +
    +		private volatile boolean isRunning;
    +
    +		private final FileInputFormat<OT> format;
    +		private final TypeSerializer<OT> serializer;
    +
    +		private final Object checkpointLock;
    +		private final TimestampedCollector<OT> collector;
    +
    +		private final Object lock = new Object();
    +
    +		private final Queue<FileInputSplit> pendingSplits;
    +
    +		SplitReader(FileInputFormat<OT> format,
    +					TypeSerializer<OT> serializer,
    +					TimestampedCollector<OT> collector,
    +					Object checkpointLock) {
    +
    +			this.format = checkNotNull(format, "Unspecified FileInputFormat.");
    +			this.serializer = checkNotNull(serializer, "Unspecified Serialized.");
    +
    +			this.pendingSplits = new LinkedList<>();
    +			this.collector = collector;
    +			this.checkpointLock = checkpointLock;
    +			this.isRunning = true;
    +		}
    +
    +		void addSplit(FileInputSplit split) {
    +			Preconditions.checkNotNull(split);
    +			synchronized (lock) {
    +				this.pendingSplits.add(split);
    +			}
    +		}
    +
    +		public boolean isRunning() {
    +			return this.isRunning;
    +		}
    +
    +		@Override
    +		public void run() {
    +			FileInputSplit split = null;
    +			try {
    +				while (this.isRunning) {
    +
    +					// get the next split to read.
    +					// locking is needed because checkpointing will
    +					// ask for a consistent snapshot of the list.
    +					synchronized (lock) {
    --- End diff --
    
    True, I didn't have that in mind. We could use a concurrent queue but this is also fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1984#issuecomment-219673873
  
    Overall, the code looks very good! I had some inline comments about Javadoc/comments.
    
    One thing that might be wrong, though is the interplay between `CheckpointableInputFormat.getCurrentChannelState()` and `SplitReader.getReaderState()`. In the latter to check whether the result of the former is null, and then do something based on this. The former, however will never return `null` but instead always returns a `Tuple2` that can have fields set to `null`. This might be an artifact form an earlier mode of implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63491234
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---
    @@ -463,7 +463,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
        * every 100 milliseconds.
        *
        */
    -  def readFileStream(StreamPath: String, intervalMillis: Long = 100, 
    --- End diff --
    
    Unrelated change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63491422
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---
    @@ -115,7 +115,7 @@ public void testHDFS() {
     			}
     			
     			Assert.assertTrue("No result file present", hdfs.exists(result));
    -			
    --- End diff --
    
    Unrelated whitespace change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63490061
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java ---
    @@ -26,6 +26,11 @@
     import org.apache.flink.core.memory.DataOutputView;
     
     @Public
    --- End diff --
    
    `@Public` should go after Javadoc, IMHO. Also, the last line of the Javadoc has an extra `*`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the pull request:

    https://github.com/apache/flink/pull/1984#issuecomment-219117647
  
    After the discussion we had today with @StephanEwen and @aljoscha , I also added the PROCESS_ONCE watchType which processes the current (when invoked) content of a file/directory and exits. This is to be able to accommodate bounded file sources (a la batch).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63493233
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.JobException;
    +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits
    + * to downstream tasks for further reading and processing. Which splits will be further processed
    + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}.
    + */
    +@Internal
    +public class FileSplitMonitoringFunction<OUT>
    +	extends RichSourceFunction<FileInputSplit> implements Checkpointed<Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long>> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
    +
    +	/**
    +	 * Specifies when computation will be triggered.
    +	 */
    +	public enum WatchType {
    +		PROCESS_ONCE,				// Processes the current content of a file/path only ONCE, and stops monitoring.
    +		REPROCESS_WITH_APPENDED		// Reprocesses the whole file when new data is appended.
    +	}
    +
    +	/** The path to monitor. */
    +	private final String path;
    +
    +	/** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */
    +	private final int readerParallelism;
    +
    +	/** The {@link FileInputFormat} to be read. */
    +	private FileInputFormat<OUT> format;
    +
    +	/** How often to monitor the state of the directory for new data. */
    +	private final long interval;
    +
    +	/** Which new data to process (see {@link WatchType}. */
    +	private final WatchType watchType;
    +
    +	private List<Tuple2<Long, List<FileInputSplit>>> splitsToFwdOrderedAscByModTime;
    +
    +	private Tuple2<Long, List<FileInputSplit>> currentSplitsToFwd;
    +
    +	private long globalModificationTime;
    +
    +	private FilePathFilter pathFilter;
    +
    +	private volatile boolean isRunning = true;
    +
    +	/**
    +	 * This is the {@link Configuration} to be used to initialize the input format at the reader
    +	 * (see {@link #open(Configuration)}). In the codebase, whenever {@link #open(Configuration)} is called,
    +	 * it is passed a new configuration, thus ignoring potential user-specified parameters. Now, we pass a
    +	 * configuration object at the constructor, which is shipped to the remote tasks.
    +	 * */
    +	private Configuration configuration;
    +
    +	public FileSplitMonitoringFunction(
    +		FileInputFormat<OUT> format, String path, Configuration configuration,
    +		WatchType watchType, int readerParallelism, long interval) {
    +
    +		this(format, path, configuration, FilePathFilter.DefaultFilter.getInstance(), watchType, readerParallelism, interval);
    +	}
    +
    +	public FileSplitMonitoringFunction(
    +		FileInputFormat<OUT> format, String path, Configuration configuration,
    +		FilePathFilter filter, WatchType watchType, int readerParallelism, long interval) {
    +
    +		this.format = Preconditions.checkNotNull(format);
    +		this.path = Preconditions.checkNotNull(path);
    +		this.configuration = Preconditions.checkNotNull(configuration);
    +
    +		Preconditions.checkArgument(interval >= 100,
    +			"The specified monitoring interval is smaller than the minimum allowed one (100 ms).");
    +		this.interval = interval;
    +
    +		this.watchType = watchType;
    +
    +		this.pathFilter = Preconditions.checkNotNull(filter);
    +
    +		this.readerParallelism = Math.max(readerParallelism, 1);
    +		this.globalModificationTime = Long.MIN_VALUE;
    +	}
    +
    +	@Override
    +	@SuppressWarnings("unchecked")
    +	public void open(Configuration parameters) throws Exception {
    +		LOG.info("Opening File Monitoring Source.");
    +		
    +		super.open(parameters);
    +		format.configure(this.configuration);
    +	}
    +
    +	@Override
    +	public void run(SourceFunction.SourceContext<FileInputSplit> context) throws Exception {
    +		FileSystem fileSystem = FileSystem.get(new URI(path));
    +
    +		switch (watchType) {
    +			case REPROCESS_WITH_APPENDED:
    +				while (isRunning) {
    +					monitorDirAndForwardSplits(fileSystem, context);
    +					Thread.sleep(interval);
    +				}
    +				isRunning = false;
    +				break;
    +			case PROCESS_ONCE:
    +				monitorDirAndForwardSplits(fileSystem, context);
    +				isRunning = false;
    +				break;
    +			default:
    +				isRunning = false;
    +				throw new RuntimeException("Unknown WatchType" + watchType);
    +		}
    +	}
    +
    +	public boolean isRunning() {
    +		return this.isRunning;
    +	}
    +
    +	private void monitorDirAndForwardSplits(FileSystem fs, SourceContext<FileInputSplit> context) throws IOException, JobException {
    +		final Object lock = context.getCheckpointLock();
    +
    +		// it may be non-null in the case of a recovery after a failure.
    +		if (currentSplitsToFwd != null) {
    +			synchronized (lock) {
    +				forwardSplits(currentSplitsToFwd, context);
    +			}
    +		}
    +		currentSplitsToFwd = null;
    --- End diff --
    
    Ah, maybe scratch that because you are removing from the list and forwarding atomically in `forwardSplits`. Right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63492517
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.JobException;
    +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits
    + * to downstream tasks for further reading and processing. Which splits will be further processed
    + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}.
    + */
    +@Internal
    +public class FileSplitMonitoringFunction<OUT>
    +	extends RichSourceFunction<FileInputSplit> implements Checkpointed<Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long>> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
    +
    +	/**
    +	 * Specifies when computation will be triggered.
    +	 */
    +	public enum WatchType {
    +		PROCESS_ONCE,				// Processes the current content of a file/path only ONCE, and stops monitoring.
    +		REPROCESS_WITH_APPENDED		// Reprocesses the whole file when new data is appended.
    +	}
    +
    +	/** The path to monitor. */
    +	private final String path;
    +
    +	/** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */
    +	private final int readerParallelism;
    +
    +	/** The {@link FileInputFormat} to be read. */
    +	private FileInputFormat<OUT> format;
    +
    +	/** How often to monitor the state of the directory for new data. */
    +	private final long interval;
    +
    +	/** Which new data to process (see {@link WatchType}. */
    +	private final WatchType watchType;
    +
    +	private List<Tuple2<Long, List<FileInputSplit>>> splitsToFwdOrderedAscByModTime;
    +
    +	private Tuple2<Long, List<FileInputSplit>> currentSplitsToFwd;
    +
    +	private long globalModificationTime;
    +
    +	private FilePathFilter pathFilter;
    +
    +	private volatile boolean isRunning = true;
    +
    +	/**
    +	 * This is the {@link Configuration} to be used to initialize the input format at the reader
    +	 * (see {@link #open(Configuration)}). In the codebase, whenever {@link #open(Configuration)} is called,
    +	 * it is passed a new configuration, thus ignoring potential user-specified parameters. Now, we pass a
    +	 * configuration object at the constructor, which is shipped to the remote tasks.
    +	 * */
    +	private Configuration configuration;
    +
    +	public FileSplitMonitoringFunction(
    +		FileInputFormat<OUT> format, String path, Configuration configuration,
    +		WatchType watchType, int readerParallelism, long interval) {
    +
    +		this(format, path, configuration, FilePathFilter.DefaultFilter.getInstance(), watchType, readerParallelism, interval);
    +	}
    +
    +	public FileSplitMonitoringFunction(
    +		FileInputFormat<OUT> format, String path, Configuration configuration,
    +		FilePathFilter filter, WatchType watchType, int readerParallelism, long interval) {
    +
    +		this.format = Preconditions.checkNotNull(format);
    +		this.path = Preconditions.checkNotNull(path);
    +		this.configuration = Preconditions.checkNotNull(configuration);
    +
    +		Preconditions.checkArgument(interval >= 100,
    +			"The specified monitoring interval is smaller than the minimum allowed one (100 ms).");
    +		this.interval = interval;
    +
    +		this.watchType = watchType;
    +
    +		this.pathFilter = Preconditions.checkNotNull(filter);
    +
    +		this.readerParallelism = Math.max(readerParallelism, 1);
    +		this.globalModificationTime = Long.MIN_VALUE;
    +	}
    +
    +	@Override
    +	@SuppressWarnings("unchecked")
    +	public void open(Configuration parameters) throws Exception {
    +		LOG.info("Opening File Monitoring Source.");
    +		
    +		super.open(parameters);
    +		format.configure(this.configuration);
    +	}
    +
    +	@Override
    +	public void run(SourceFunction.SourceContext<FileInputSplit> context) throws Exception {
    +		FileSystem fileSystem = FileSystem.get(new URI(path));
    +
    +		switch (watchType) {
    +			case REPROCESS_WITH_APPENDED:
    +				while (isRunning) {
    +					monitorDirAndForwardSplits(fileSystem, context);
    +					Thread.sleep(interval);
    +				}
    +				isRunning = false;
    +				break;
    +			case PROCESS_ONCE:
    +				monitorDirAndForwardSplits(fileSystem, context);
    +				isRunning = false;
    +				break;
    +			default:
    +				isRunning = false;
    +				throw new RuntimeException("Unknown WatchType" + watchType);
    +		}
    +	}
    +
    +	public boolean isRunning() {
    --- End diff --
    
    I think this getter is not needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63492326
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.JobException;
    +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits
    + * to downstream tasks for further reading and processing. Which splits will be further processed
    + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}.
    + */
    +@Internal
    +public class FileSplitMonitoringFunction<OUT>
    +	extends RichSourceFunction<FileInputSplit> implements Checkpointed<Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long>> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
    +
    +	/**
    +	 * Specifies when computation will be triggered.
    +	 */
    +	public enum WatchType {
    +		PROCESS_ONCE,				// Processes the current content of a file/path only ONCE, and stops monitoring.
    +		REPROCESS_WITH_APPENDED		// Reprocesses the whole file when new data is appended.
    +	}
    +
    +	/** The path to monitor. */
    +	private final String path;
    +
    +	/** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */
    +	private final int readerParallelism;
    +
    +	/** The {@link FileInputFormat} to be read. */
    +	private FileInputFormat<OUT> format;
    +
    +	/** How often to monitor the state of the directory for new data. */
    +	private final long interval;
    +
    +	/** Which new data to process (see {@link WatchType}. */
    +	private final WatchType watchType;
    +
    +	private List<Tuple2<Long, List<FileInputSplit>>> splitsToFwdOrderedAscByModTime;
    +
    +	private Tuple2<Long, List<FileInputSplit>> currentSplitsToFwd;
    +
    +	private long globalModificationTime;
    +
    +	private FilePathFilter pathFilter;
    +
    +	private volatile boolean isRunning = true;
    +
    +	/**
    +	 * This is the {@link Configuration} to be used to initialize the input format at the reader
    +	 * (see {@link #open(Configuration)}). In the codebase, whenever {@link #open(Configuration)} is called,
    --- End diff --
    
    This is only true for streaming programs, something like `In streaming programs, whenever ...` should do the trick.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the pull request:

    https://github.com/apache/flink/pull/1984#issuecomment-219742461
  
    Thanks for the comments @aljoscha . 
    
    The only comment not yet integrated is the one with the {{OutputTypeConfigurable}} which I have to understand a bit better how to implement correctly. As for the spaces, it it intellij that add them. I will try to fix them also later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63490526
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java ---
    @@ -394,4 +394,4 @@ public IntValue nextRecord(IntValue reuse) throws IOException {
     			return null;
     		}
     	}
    -}
    \ No newline at end of file
    +}
    --- End diff --
    
    Just whitespace changes in this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63492168
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.JobException;
    +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits
    --- End diff --
    
    We could mention here that it is meant to work together with the `FileSplitReadOperator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63494863
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java ---
    @@ -0,0 +1,368 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.io.CheckpointableInputFormat;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.operators.TimestampedCollector;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Queue;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}.
    + * This operator will receive just the split descriptors and then read and emit records. This may lead
    + * to backpressure. To avoid this, we will have another thread actually reading the splits and
    + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the
    + * current state.
    + * */
    +public class FileSplitReadOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
    +	implements OneInputStreamOperator<FileInputSplit, OUT> {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(FileSplitReadOperator.class);
    +
    +	private static final FileInputSplit EOF = new FileInputSplit(-1, null, -1, -1, null);
    +
    +	private transient SplitReader<S, OUT> reader;
    +	private transient TimestampedCollector<OUT> collector;
    +
    +	private Configuration configuration;
    +	private FileInputFormat<OUT> format;
    +	private TypeInformation<OUT> typeInfo;
    +
    +	private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
    +
    +	public FileSplitReadOperator(FileInputFormat<OUT> format, TypeInformation<OUT> typeInfo, Configuration configuration) {
    +		this.format = checkNotNull(format);
    +		this.typeInfo = checkNotNull(typeInfo);
    +		this.configuration = checkNotNull(configuration);
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		super.open();
    +
    +		this.format.configure(configuration);
    +		this.collector = new TimestampedCollector<>(output);
    +
    +		TypeSerializer<OUT> serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
    +		Object checkpointLock = getContainingTask().getCheckpointLock();
    +
    +		this.reader = new SplitReader<>(format, serializer, collector, checkpointLock);
    +		this.reader.setReaderState(this.readerState);
    +		this.reader.start();
    +		this.readerState = null;
    +	}
    +
    +	@Override
    +	public void processElement(StreamRecord<FileInputSplit> element) throws Exception {
    +		reader.addSplit(element.getValue());
    +	}
    +
    +	@Override
    +	public void processWatermark(Watermark mark) throws Exception {
    +		output.emitWatermark(mark);
    +	}
    +
    +	@Override
    +	public void dispose() {
    +		super.dispose();
    +
    +		// first try to cancel it properly and
    +		// give it some time until it finishes
    +		reader.cancel();
    +		try {
    +			reader.join(200);
    +		} catch (InterruptedException e) {
    +			// we can ignore this
    +		}
    +
    +		// if the above did not work, then interrupt the thread repeatedly
    +		while (reader.isAlive()) {
    +
    +			StringBuilder bld = new StringBuilder();
    +			StackTraceElement[] stack = reader.getStackTrace();
    +			for (StackTraceElement e : stack) {
    +				bld.append(e).append('\n');
    +			}
    +			LOG.warn("The reader is stuck in method:\n {}", bld.toString());
    +
    +			reader.interrupt();
    +			try {
    +				reader.join(50);
    +			} catch (InterruptedException e) {
    +				// we can ignore this
    +			}
    +		}
    +		reader = null;
    +		collector = null;
    +		configuration = null;
    +		format = null;
    +		typeInfo = null;
    +	}
    +
    +	@Override
    +	public void close() throws Exception {
    +		super.close();
    +
    +		// signal that no more splits will come, wait for the reader to finish
    +		// and close the collector. Further cleaning up is handled by the dispose().
    +
    +		if (reader != null && reader.isAlive() && reader.isRunning()) {
    +			// add a dummy element to signal that no more splits will
    +			// arrive and wait until the reader finishes
    +			reader.addSplit(EOF);
    +			reader.join();
    +		}
    +		collector.close();
    +	}
    +
    +	private class SplitReader<S extends Serializable, OT> extends Thread {
    +
    +		private volatile boolean isRunning;
    +
    +		private final FileInputFormat<OT> format;
    +		private final TypeSerializer<OT> serializer;
    +
    +		private final Object checkpointLock;
    +		private final TimestampedCollector<OT> collector;
    +
    +		private final Object lock = new Object();
    +
    +		private final Queue<FileInputSplit> pendingSplits;
    +
    +		SplitReader(FileInputFormat<OT> format,
    +					TypeSerializer<OT> serializer,
    +					TimestampedCollector<OT> collector,
    +					Object checkpointLock) {
    +
    +			this.format = checkNotNull(format, "Unspecified FileInputFormat.");
    +			this.serializer = checkNotNull(serializer, "Unspecified Serialized.");
    +
    +			this.pendingSplits = new LinkedList<>();
    +			this.collector = collector;
    +			this.checkpointLock = checkpointLock;
    +			this.isRunning = true;
    +		}
    +
    +		void addSplit(FileInputSplit split) {
    +			Preconditions.checkNotNull(split);
    +			synchronized (lock) {
    +				this.pendingSplits.add(split);
    +			}
    +		}
    +
    +		public boolean isRunning() {
    +			return this.isRunning;
    +		}
    +
    +		@Override
    +		public void run() {
    +			FileInputSplit split = null;
    +			try {
    +				while (this.isRunning) {
    +
    +					// get the next split to read.
    +					// locking is needed because checkpointing will
    +					// ask for a consistent snapshot of the list.
    +					synchronized (lock) {
    --- End diff --
    
    The lock seems only to be required later, here we are not changing the queue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u closed the pull request at:

    https://github.com/apache/flink/pull/1984


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63492862
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.JobException;
    +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits
    + * to downstream tasks for further reading and processing. Which splits will be further processed
    + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}.
    + */
    +@Internal
    +public class FileSplitMonitoringFunction<OUT>
    +	extends RichSourceFunction<FileInputSplit> implements Checkpointed<Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long>> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
    +
    +	/**
    +	 * Specifies when computation will be triggered.
    +	 */
    +	public enum WatchType {
    +		PROCESS_ONCE,				// Processes the current content of a file/path only ONCE, and stops monitoring.
    +		REPROCESS_WITH_APPENDED		// Reprocesses the whole file when new data is appended.
    +	}
    +
    +	/** The path to monitor. */
    +	private final String path;
    +
    +	/** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */
    +	private final int readerParallelism;
    +
    +	/** The {@link FileInputFormat} to be read. */
    +	private FileInputFormat<OUT> format;
    +
    +	/** How often to monitor the state of the directory for new data. */
    +	private final long interval;
    +
    +	/** Which new data to process (see {@link WatchType}. */
    +	private final WatchType watchType;
    +
    +	private List<Tuple2<Long, List<FileInputSplit>>> splitsToFwdOrderedAscByModTime;
    +
    +	private Tuple2<Long, List<FileInputSplit>> currentSplitsToFwd;
    +
    +	private long globalModificationTime;
    +
    +	private FilePathFilter pathFilter;
    +
    +	private volatile boolean isRunning = true;
    +
    +	/**
    +	 * This is the {@link Configuration} to be used to initialize the input format at the reader
    +	 * (see {@link #open(Configuration)}). In the codebase, whenever {@link #open(Configuration)} is called,
    +	 * it is passed a new configuration, thus ignoring potential user-specified parameters. Now, we pass a
    +	 * configuration object at the constructor, which is shipped to the remote tasks.
    +	 * */
    +	private Configuration configuration;
    +
    +	public FileSplitMonitoringFunction(
    +		FileInputFormat<OUT> format, String path, Configuration configuration,
    +		WatchType watchType, int readerParallelism, long interval) {
    +
    +		this(format, path, configuration, FilePathFilter.DefaultFilter.getInstance(), watchType, readerParallelism, interval);
    +	}
    +
    +	public FileSplitMonitoringFunction(
    +		FileInputFormat<OUT> format, String path, Configuration configuration,
    +		FilePathFilter filter, WatchType watchType, int readerParallelism, long interval) {
    +
    +		this.format = Preconditions.checkNotNull(format);
    +		this.path = Preconditions.checkNotNull(path);
    +		this.configuration = Preconditions.checkNotNull(configuration);
    +
    +		Preconditions.checkArgument(interval >= 100,
    +			"The specified monitoring interval is smaller than the minimum allowed one (100 ms).");
    +		this.interval = interval;
    +
    +		this.watchType = watchType;
    +
    +		this.pathFilter = Preconditions.checkNotNull(filter);
    +
    +		this.readerParallelism = Math.max(readerParallelism, 1);
    +		this.globalModificationTime = Long.MIN_VALUE;
    +	}
    +
    +	@Override
    +	@SuppressWarnings("unchecked")
    +	public void open(Configuration parameters) throws Exception {
    +		LOG.info("Opening File Monitoring Source.");
    +		
    +		super.open(parameters);
    +		format.configure(this.configuration);
    +	}
    +
    +	@Override
    +	public void run(SourceFunction.SourceContext<FileInputSplit> context) throws Exception {
    +		FileSystem fileSystem = FileSystem.get(new URI(path));
    +
    +		switch (watchType) {
    +			case REPROCESS_WITH_APPENDED:
    +				while (isRunning) {
    +					monitorDirAndForwardSplits(fileSystem, context);
    +					Thread.sleep(interval);
    +				}
    +				isRunning = false;
    +				break;
    +			case PROCESS_ONCE:
    +				monitorDirAndForwardSplits(fileSystem, context);
    +				isRunning = false;
    +				break;
    +			default:
    +				isRunning = false;
    +				throw new RuntimeException("Unknown WatchType" + watchType);
    +		}
    +	}
    +
    +	public boolean isRunning() {
    +		return this.isRunning;
    +	}
    +
    +	private void monitorDirAndForwardSplits(FileSystem fs, SourceContext<FileInputSplit> context) throws IOException, JobException {
    +		final Object lock = context.getCheckpointLock();
    +
    +		// it may be non-null in the case of a recovery after a failure.
    +		if (currentSplitsToFwd != null) {
    +			synchronized (lock) {
    +				forwardSplits(currentSplitsToFwd, context);
    +			}
    +		}
    +		currentSplitsToFwd = null;
    --- End diff --
    
    I think this should also go into the synchronized block, so that forwarding and changing the state is atomic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63494540
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java ---
    @@ -0,0 +1,368 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.io.CheckpointableInputFormat;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.operators.TimestampedCollector;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Queue;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}.
    + * This operator will receive just the split descriptors and then read and emit records. This may lead
    + * to backpressure. To avoid this, we will have another thread actually reading the splits and
    + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the
    + * current state.
    + * */
    +public class FileSplitReadOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
    +	implements OneInputStreamOperator<FileInputSplit, OUT> {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(FileSplitReadOperator.class);
    +
    +	private static final FileInputSplit EOF = new FileInputSplit(-1, null, -1, -1, null);
    +
    +	private transient SplitReader<S, OUT> reader;
    +	private transient TimestampedCollector<OUT> collector;
    +
    +	private Configuration configuration;
    +	private FileInputFormat<OUT> format;
    +	private TypeInformation<OUT> typeInfo;
    --- End diff --
    
    This is a very subtle thing but not all `TypeInformation` are `Serializable` and none of them should be. This is a problem that we introduced a while back.
    
    The way to do it is to implement `OutputTypeConfigurable`, there the `TypeSerializer` can be created. In `open()` you should then ensure that you actually have a `TypeSerializer`.
    
    And yes, I know that no-one can really know this without having encountered a serialization problem once ...  \U0001f605


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63493915
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.JobException;
    +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits
    + * to downstream tasks for further reading and processing. Which splits will be further processed
    + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}.
    + */
    +@Internal
    +public class FileSplitMonitoringFunction<OUT>
    +	extends RichSourceFunction<FileInputSplit> implements Checkpointed<Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long>> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
    +
    +	/**
    +	 * Specifies when computation will be triggered.
    +	 */
    +	public enum WatchType {
    +		PROCESS_ONCE,				// Processes the current content of a file/path only ONCE, and stops monitoring.
    +		REPROCESS_WITH_APPENDED		// Reprocesses the whole file when new data is appended.
    +	}
    +
    +	/** The path to monitor. */
    +	private final String path;
    +
    +	/** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */
    +	private final int readerParallelism;
    +
    +	/** The {@link FileInputFormat} to be read. */
    +	private FileInputFormat<OUT> format;
    +
    +	/** How often to monitor the state of the directory for new data. */
    +	private final long interval;
    +
    +	/** Which new data to process (see {@link WatchType}. */
    +	private final WatchType watchType;
    +
    +	private List<Tuple2<Long, List<FileInputSplit>>> splitsToFwdOrderedAscByModTime;
    +
    +	private Tuple2<Long, List<FileInputSplit>> currentSplitsToFwd;
    +
    +	private long globalModificationTime;
    +
    +	private FilePathFilter pathFilter;
    +
    +	private volatile boolean isRunning = true;
    +
    +	/**
    +	 * This is the {@link Configuration} to be used to initialize the input format at the reader
    +	 * (see {@link #open(Configuration)}). In the codebase, whenever {@link #open(Configuration)} is called,
    +	 * it is passed a new configuration, thus ignoring potential user-specified parameters. Now, we pass a
    +	 * configuration object at the constructor, which is shipped to the remote tasks.
    +	 * */
    +	private Configuration configuration;
    +
    +	public FileSplitMonitoringFunction(
    +		FileInputFormat<OUT> format, String path, Configuration configuration,
    +		WatchType watchType, int readerParallelism, long interval) {
    +
    +		this(format, path, configuration, FilePathFilter.DefaultFilter.getInstance(), watchType, readerParallelism, interval);
    +	}
    +
    +	public FileSplitMonitoringFunction(
    +		FileInputFormat<OUT> format, String path, Configuration configuration,
    +		FilePathFilter filter, WatchType watchType, int readerParallelism, long interval) {
    +
    +		this.format = Preconditions.checkNotNull(format);
    +		this.path = Preconditions.checkNotNull(path);
    +		this.configuration = Preconditions.checkNotNull(configuration);
    +
    +		Preconditions.checkArgument(interval >= 100,
    +			"The specified monitoring interval is smaller than the minimum allowed one (100 ms).");
    +		this.interval = interval;
    +
    +		this.watchType = watchType;
    +
    +		this.pathFilter = Preconditions.checkNotNull(filter);
    +
    +		this.readerParallelism = Math.max(readerParallelism, 1);
    +		this.globalModificationTime = Long.MIN_VALUE;
    +	}
    +
    +	@Override
    +	@SuppressWarnings("unchecked")
    +	public void open(Configuration parameters) throws Exception {
    +		LOG.info("Opening File Monitoring Source.");
    +		
    +		super.open(parameters);
    +		format.configure(this.configuration);
    +	}
    +
    +	@Override
    +	public void run(SourceFunction.SourceContext<FileInputSplit> context) throws Exception {
    +		FileSystem fileSystem = FileSystem.get(new URI(path));
    +
    +		switch (watchType) {
    +			case REPROCESS_WITH_APPENDED:
    +				while (isRunning) {
    +					monitorDirAndForwardSplits(fileSystem, context);
    +					Thread.sleep(interval);
    +				}
    +				isRunning = false;
    +				break;
    +			case PROCESS_ONCE:
    +				monitorDirAndForwardSplits(fileSystem, context);
    +				isRunning = false;
    +				break;
    +			default:
    +				isRunning = false;
    +				throw new RuntimeException("Unknown WatchType" + watchType);
    +		}
    +	}
    +
    +	public boolean isRunning() {
    +		return this.isRunning;
    +	}
    +
    +	private void monitorDirAndForwardSplits(FileSystem fs, SourceContext<FileInputSplit> context) throws IOException, JobException {
    +		final Object lock = context.getCheckpointLock();
    +
    +		// it may be non-null in the case of a recovery after a failure.
    +		if (currentSplitsToFwd != null) {
    +			synchronized (lock) {
    +				forwardSplits(currentSplitsToFwd, context);
    +			}
    +		}
    +		currentSplitsToFwd = null;
    +
    +		// it may be non-null in the case of a recovery after a failure.
    +		if (splitsToFwdOrderedAscByModTime == null) {
    +			splitsToFwdOrderedAscByModTime = getInputSplitSortedOnModTime(fs);
    +		}
    +
    +		Iterator<Tuple2<Long, List<FileInputSplit>>> it =
    +			splitsToFwdOrderedAscByModTime.iterator();
    +
    +		while (it.hasNext()) {
    +			synchronized (lock) {
    +				currentSplitsToFwd = it.next();
    +				it.remove();
    +				forwardSplits(currentSplitsToFwd, context);
    +			}
    +		}
    +
    +		// set them to null to distinguish from a restore.
    +		splitsToFwdOrderedAscByModTime = null;
    +		currentSplitsToFwd = null;
    +	}
    +
    +	private void forwardSplits(Tuple2<Long, List<FileInputSplit>> splitsToFwd, SourceContext<FileInputSplit> context) {
    +		currentSplitsToFwd = splitsToFwd;
    +		Long modTime = currentSplitsToFwd.f0;
    +		List<FileInputSplit> splits = currentSplitsToFwd.f1;
    +
    +		Iterator<FileInputSplit> it = splits.iterator();
    +		while (it.hasNext()) {
    +			FileInputSplit split = it.next();
    +			processSplit(split, context);
    +			it.remove();
    +		}
    +
    +		// update the global modification time
    +		if (modTime >= globalModificationTime) {
    +			globalModificationTime = modTime;
    +		}
    +	}
    +
    +	private void processSplit(FileInputSplit split, SourceContext<FileInputSplit> context) {
    +		LOG.info("Forwarding split: " + split);
    +		context.collect(split);
    +	}
    +
    +	private List<Tuple2<Long, List<FileInputSplit>>> getInputSplitSortedOnModTime(FileSystem fileSystem) throws IOException {
    +		List<FileStatus> eligibleFiles = listEligibleFiles(fileSystem);
    +		if (eligibleFiles.isEmpty()) {
    +			return new ArrayList<>();
    +		}
    +
    +		Map<Long, List<FileInputSplit>> splitsToForward = getInputSplits(eligibleFiles);
    +		List<Tuple2<Long, List<FileInputSplit>>> sortedSplitsToForward = new ArrayList<>();
    +
    +		for (Map.Entry<Long, List<FileInputSplit>> entry : splitsToForward.entrySet()) {
    +			sortedSplitsToForward.add(new Tuple2<>(entry.getKey(), entry.getValue()));
    +		}
    +
    +		Collections.sort(sortedSplitsToForward, new Comparator<Tuple2<Long, List<FileInputSplit>>>() {
    +			@Override
    +			public int compare(Tuple2<Long, List<FileInputSplit>> o1, Tuple2<Long, List<FileInputSplit>> o2) {
    +				return (int) (o1.f0 - o2.f0);
    +			}
    +		});
    +
    +		return sortedSplitsToForward;
    +	}
    +
    +	/**
    +	 * Creates the input splits for the path to be assigned to the downstream tasks.
    --- End diff --
    
    The `files` parameter should be called `eligibleFiles`, right? Also the comment seems to indicate that files in the list are not used while they are the ones that are actually used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63491931
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.core.fs.Path;
    +
    +import java.io.Serializable;
    +
    +/**
    + * An interface to be implemented by the user when using the {@link FileSplitMonitoringFunction}.
    + * The {@link #filterPath(Path)} method is responsible for deciding if a path is eligible for further
    + * processing or not. This can serve to exclude temporary or partial files that
    + * are still being written.
    + *
    + *<p/>
    + * A default implementation is the {@link DefaultFilter} which excludes files starting with ".", "_", or
    + * contain the "_COPYING_" in their names. This can be retrieved by {@link DefaultFilter#getInstance()}.
    + * */
    +public interface FilePathFilter extends Serializable {
    +
    +	/**
    +	 * @return {@code true} if the {@code filePath} given is to be
    --- End diff --
    
    I think having a `@return` here is strange. normally a `@return` follows the main description body. Could just have `Returns {@code true} ...` here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63490499
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ---
    @@ -235,7 +235,7 @@ protected FileInputFormat(Path filePath) {
     	// --------------------------------------------------------------------------------------------
     	//  Getters/setters for the configurable parameters
     	// --------------------------------------------------------------------------------------------
    -	
    +
    --- End diff --
    
    Just whitespace changes in this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---