You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/27 17:49:06 UTC

[GitHub] sijie closed pull request #2869: Added Pulsar IO connector for local files

sijie closed pull request #2869: Added Pulsar IO connector for local files
URL: https://github.com/apache/pulsar/pull/2869
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-io/file/pom.xml b/pulsar-io/file/pom.xml
new file mode 100644
index 0000000000..c2d892f751
--- /dev/null
+++ b/pulsar-io/file/pom.xml
@@ -0,0 +1,70 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-io</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-io-file</artifactId>
+  <name>Pulsar IO :: File</name>
+  
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+    </dependency>
+
+    <dependency>
+    	<groupId>com.fasterxml.jackson.core</groupId>
+    	<artifactId>jackson-databind</artifactId>
+    </dependency>
+    
+    <dependency>
+    	<groupId>commons-io</groupId>
+    	<artifactId>commons-io</artifactId>
+    </dependency>
+
+    <dependency>
+    	<groupId>org.apache.commons</groupId>
+    	<artifactId>commons-lang3</artifactId>
+    </dependency>
+  </dependencies>
+  
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+  
+</project>
\ No newline at end of file
diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileConsumerThread.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileConsumerThread.java
new file mode 100644
index 0000000000..8c9803a03c
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileConsumerThread.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.io.file.utils.GZipFiles;
+import org.apache.pulsar.io.file.utils.ZipFiles;
+
+/**
+ * Worker thread that consumes the contents of the files
+ * and publishes them to a Pulsar topic.
+ */
+public class FileConsumerThread extends Thread {
+
+    private final PushSource<byte[]> source;
+    private final BlockingQueue<File> workQueue;
+    private final BlockingQueue<File> inProcess;
+    private final BlockingQueue<File> recentlyProcessed;
+
+    public FileConsumerThread(PushSource<byte[]> source,
+            BlockingQueue<File> workQueue,
+            BlockingQueue<File> inProcess,
+            BlockingQueue<File> recentlyProcessed) {
+        this.source = source;
+        this.workQueue = workQueue;
+        this.inProcess = inProcess;
+        this.recentlyProcessed = recentlyProcessed;
+    }
+
+    public void run() {
+        try {
+            while (true) {
+                File file = workQueue.take();
+
+                boolean added = false;
+                do {
+                    added = inProcess.add(file);
+                } while (!added);
+
+                consumeFile(file);
+            }
+        } catch (InterruptedException ie) {
+            // just terminate
+        }
+    }
+
+    private void consumeFile(File file) {
+        final AtomicInteger idx = new AtomicInteger(1);
+        try (Stream<String> lines = getLines(file)) {
+             lines.forEachOrdered(line -> process(file, idx.getAndIncrement(), line));
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+
+            boolean removed = false;
+            do {
+                removed = inProcess.remove(file);
+            } while (!removed);
+
+            boolean added = false;
+            do {
+                added = recentlyProcessed.add(file);
+            } while (!added);
+        }
+    }
+
+    private Stream<String> getLines(File file) throws IOException {
+        if (file == null) {
+            return null;
+        } else if (GZipFiles.isGzip(file)) {
+            return GZipFiles.lines(Paths.get(file.getAbsolutePath()));
+        } else if (ZipFiles.isZip(file)) {
+            return ZipFiles.lines(Paths.get(file.getAbsolutePath()));
+        } else {
+            return Files.lines(Paths.get(file.getAbsolutePath()), Charset.defaultCharset());
+        }
+    }
+
+    private void process(File srcFile, int lineNumber, String line) {
+        source.consume(new FileRecord(srcFile, lineNumber, line.getBytes()));
+    }
+
+}
diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
new file mode 100644
index 0000000000..a13b923c25
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+/**
+ * Worker thread that checks the configured input directory for
+ * files that meet the provided filtering criteria, and publishes
+ * them to a work queue for processing by the FileConsumerThreads.
+ */
+public class FileListingThread extends Thread {
+
+    private final AtomicLong queueLastUpdated = new AtomicLong(0L);
+    private final Lock listingLock = new ReentrantLock();
+    private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
+    private final BlockingQueue<File> workQueue;
+    private final BlockingQueue<File> inProcess;
+    private final BlockingQueue<File> recentlyProcessed;
+
+    private final String inputDir;
+    private final boolean recurseDirs;
+    private final boolean keepOriginal;
+    private final long pollingInterval;
+
+    public FileListingThread(FileSourceConfig fileConfig,
+            BlockingQueue<File> workQueue,
+            BlockingQueue<File> inProcess,
+            BlockingQueue<File> recentlyProcessed) {
+        this.workQueue = workQueue;
+        this.inProcess = inProcess;
+        this.recentlyProcessed = recentlyProcessed;
+
+        inputDir = fileConfig.getInputDirectory();
+        recurseDirs = Optional.ofNullable(fileConfig.getRecurse()).orElse(true);
+        keepOriginal = Optional.ofNullable(fileConfig.getKeepFile()).orElse(false);
+        pollingInterval = Optional.ofNullable(fileConfig.getPollingInterval()).orElse(10000L);
+        fileFilterRef.set(createFileFilter(fileConfig));
+    }
+
+    public void run() {
+        while (true) {
+            if ((queueLastUpdated.get() < System.currentTimeMillis() - pollingInterval) && listingLock.tryLock()) {
+                try {
+                    final File directory = new File(inputDir);
+                    final Set<File> listing = performListing(directory, fileFilterRef.get(), recurseDirs);
+
+                    if (listing != null && !listing.isEmpty()) {
+
+                        // Remove any files that have been or are currently being processed.
+                        listing.removeAll(inProcess);
+                        if (!keepOriginal) {
+                            listing.removeAll(recentlyProcessed);
+                        }
+
+                        for (File f: listing) {
+                            if (!workQueue.contains(f)) {
+                                workQueue.offer(f);
+                            }
+                        }
+                        queueLastUpdated.set(System.currentTimeMillis());
+                    }
+
+                 } finally {
+                    listingLock.unlock();
+                 }
+            }
+
+            try {
+                sleep(pollingInterval - 1);
+            } catch (InterruptedException e) {
+                // Just ignore
+            }
+        }
+    }
+
+    private Set<File> performListing(final File directory, final FileFilter filter,
+            final boolean recurseSubdirectories) {
+        Path p = directory.toPath();
+        if (!Files.isWritable(p) || !Files.isReadable(p)) {
+            throw new IllegalStateException("Directory '" + directory
+                    + "' does not have sufficient permissions (i.e., not writable and readable)");
+        }
+        final Set<File> queue = new HashSet<>();
+        if (!directory.exists()) {
+            return queue;
+        }
+
+        final File[] children = directory.listFiles();
+        if (children == null) {
+            return queue;
+        }
+
+        for (final File child : children) {
+            if (child.isDirectory()) {
+                if (recurseSubdirectories) {
+                    queue.addAll(performListing(child, filter, recurseSubdirectories));
+                }
+            } else if (filter.accept(child)) {
+                queue.add(child);
+            }
+        }
+
+        return queue;
+    }
+
+    private FileFilter createFileFilter(FileSourceConfig fileConfig) {
+        final long minSize = Optional.ofNullable(fileConfig.getMinimumSize()).orElse(1);
+        final Double maxSize = Optional.ofNullable(fileConfig.getMaximumSize()).orElse(Double.MAX_VALUE);
+        final long minAge = Optional.ofNullable(fileConfig.getMinimumFileAge()).orElse(0);
+        final Long maxAge = Optional.ofNullable(fileConfig.getMaximumFileAge()).orElse(Long.MAX_VALUE);
+        final boolean ignoreHidden = Optional.ofNullable(fileConfig.getIgnoreHiddenFiles()).orElse(true);
+        final Pattern filePattern = Pattern.compile(Optional.ofNullable(fileConfig.getFileFilter()).orElse("[^\\.].*"));
+        final String indir = fileConfig.getInputDirectory();
+        final String pathPatternStr = fileConfig.getPathFilter();
+        final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
+
+        return new FileFilter() {
+            @Override
+            public boolean accept(final File file) {
+                if (minSize > file.length()) {
+                    return false;
+                }
+                if (maxSize != null && maxSize < file.length()) {
+                    return false;
+                }
+                final long fileAge = System.currentTimeMillis() - file.lastModified();
+                if (minAge > fileAge) {
+                    return false;
+                }
+                if (maxAge != null && maxAge < fileAge) {
+                    return false;
+                }
+                if (ignoreHidden && file.isHidden()) {
+                    return false;
+                }
+                if (pathPattern != null) {
+                    Path reldir = Paths.get(indir).relativize(file.toPath()).getParent();
+                    if (reldir != null && !reldir.toString().isEmpty()) {
+                        if (!pathPattern.matcher(reldir.toString()).matches()) {
+                            return false;
+                        }
+                    }
+                }
+                //Verify that we have at least read permissions on the file we're considering grabbing
+                if (!Files.isReadable(file.toPath())) {
+                    return false;
+                }
+
+                /* Verify that if we're not keeping original that we have write
+                 * permissions on the directory the file is in
+                 */
+                if (!keepOriginal && !Files.isWritable(file.toPath().getParent())) {
+                    return false;
+                }
+                return filePattern.matcher(file.getName()).matches();
+            }
+        };
+    }
+}
diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileRecord.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileRecord.java
new file mode 100644
index 0000000000..0fb944817e
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileRecord.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import lombok.Data;
+
+import org.apache.pulsar.functions.api.Record;
+
+/**
+ * Implementation of the Record interface for File Source data.
+ *    - The key is set to the source file name + the line number of the record.
+ *    - The value is set to the file contents for the given line number (in bytes)
+ *    - The following user properties are also set:
+ *      - The source file name
+ *      - The absolute path of the source file
+ *      - The last modified time of the source file.
+ */
+@Data
+public class FileRecord implements Record<byte[]> {
+
+    public static final String FILE_NAME = "file.name";
+    public static final String FILE_ABSOLUTE_PATH = "file.path";
+    public static final String FILE_MODIFIED_TIME = "file.modified.time";
+
+    private final Optional<String> key;
+    private final byte[] value;
+    private final HashMap<String, String> userProperties = new HashMap<String, String> ();
+
+    public FileRecord(File srcFile, int lineNumber, byte[] value) {
+        this.key = Optional.of(srcFile.getName() + "_" + lineNumber);
+        this.value = value;
+        this.setProperty(FILE_NAME, srcFile.getName());
+        this.setProperty(FILE_ABSOLUTE_PATH, srcFile.getAbsolutePath());
+        this.setProperty(FILE_MODIFIED_TIME, srcFile.lastModified() + "");
+    }
+
+    @Override
+    public Optional<String> getKey() {
+        return key;
+    }
+
+    @Override
+    public byte[] getValue() {
+        return value;
+    }
+
+    public Map<String, String> getProperties() {
+        return userProperties;
+    }
+
+    public void setProperty(String key, String value) {
+        userProperties.put(key, value);
+    }
+}
diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java
new file mode 100644
index 0000000000..bc09c978dd
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.io.core.SourceContext;
+
+/**
+ * A simple connector to consume messages from the local file system.
+ * It can be configured to consume files recursively from a given
+ * directory, and can handle plain text, gzip, and zip formatted files.
+ */
+public class FileSource extends PushSource<byte[]> {
+
+    private ExecutorService executor;
+    private final BlockingQueue<File> workQueue = new LinkedBlockingQueue<>();
+    private final BlockingQueue<File> inProcess = new LinkedBlockingQueue<>();
+    private final BlockingQueue<File> recentlyProcessed = new LinkedBlockingQueue<>();
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+        FileSourceConfig fileConfig = FileSourceConfig.load(config);
+        fileConfig.validate();
+
+        // One extra for the File listing thread, and another for the cleanup thread
+        executor = Executors.newFixedThreadPool(fileConfig.getNumWorkers() + 2);
+        executor.execute(new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed));
+        executor.execute(new ProcessedFileThread(fileConfig, recentlyProcessed));
+
+        for (int idx = 0; idx < fileConfig.getNumWorkers(); idx++) {
+            executor.execute(new FileConsumerThread(this, workQueue, inProcess, recentlyProcessed));
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        executor.shutdown();
+        try {
+            if (!executor.awaitTermination(800, TimeUnit.MILLISECONDS)) {
+                executor.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            executor.shutdownNow();
+        }
+    }
+}
diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
new file mode 100644
index 0000000000..efc7f634b9
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Configuration class for the File Source Connector.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class FileSourceConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * The input directory from which to pull files.
+     */
+    private String inputDirectory;
+
+    /**
+     * Indicates whether or not to pull files from sub-directories.
+     */
+    private Boolean recurse;
+
+    /**
+     * If true, the file is not deleted after it has been processed and
+     * causes the file to be picked up continually.
+     */
+    private Boolean keepFile = Boolean.FALSE;
+
+    /**
+     * Only files whose names match the given regular expression will be picked up.
+     */
+    private String fileFilter = "[^\\.].*";
+
+    /**
+     * When 'recurse' property is true, then only sub-directories whose
+     * path matches the given regular expression will be scanned.
+     */
+    private String pathFilter;
+
+    /**
+     * The minimum age that a file must be in order to be processed; any file younger
+     * than this amount of time (according to last modification date) will be ignored.
+     */
+    private Integer minimumFileAge;
+
+    /**
+     * The maximum age that a file must be in order to be processed; any file older
+     * than this amount of time (according to last modification date) will be ignored.
+     */
+    private Long maximumFileAge;
+
+    /**
+     * The minimum size (in bytes) that a file must be in order to be processed.
+     */
+    private Integer minimumSize;
+
+    /**
+     * The maximum size (in bytes) that a file can be in order to be processed.
+     */
+    private Double maximumSize;
+
+    /**
+     * Indicates whether or not hidden files should be ignored or not.
+     */
+    private Boolean ignoreHiddenFiles;
+
+    /**
+     * Indicates how long to wait before performing a directory listing.
+     */
+    private Long pollingInterval;
+
+    /**
+     * The number of worker threads that will be processing the files.
+     * This allows you to process a larger number of files concurrently.
+     * However, setting this to a value greater than 1 will result in the data
+     * from multiple files being "intermingled" in the target topic.
+     */
+    private Integer numWorkers = 1;
+
+    public static FileSourceConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), FileSourceConfig.class);
+    }
+
+    public static FileSourceConfig load(Map<String, Object> map) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), FileSourceConfig.class);
+    }
+
+    public void validate() {
+        if (StringUtils.isBlank(inputDirectory)) {
+            throw new IllegalArgumentException("Required property not set.");
+        } else if (Files.notExists(Paths.get(inputDirectory), LinkOption.NOFOLLOW_LINKS)) {
+            throw new IllegalArgumentException("Specified input directory does not exist");
+        } else if (!Files.isReadable(Paths.get(inputDirectory))) {
+            throw new IllegalArgumentException("Specified input directory is not readable");
+        } else if (Optional.ofNullable(keepFile).orElse(false) && !Files.isWritable(Paths.get(inputDirectory))) {
+            throw new IllegalArgumentException("You have requested the consumed files to be deleted, but the "
+                    + "source directory is not writeable.");
+        }
+
+        if (StringUtils.isNotBlank(fileFilter)) {
+            try {
+                Pattern.compile(fileFilter);
+            } catch (final PatternSyntaxException psEx) {
+                throw new IllegalArgumentException("Invalid Regex pattern provided for fileFilter");
+            }
+        }
+
+        if (minimumFileAge != null &&  Math.signum(minimumFileAge) < 0) {
+            throw new IllegalArgumentException("The property minimumFileAge must be non-negative");
+        }
+
+        if (maximumFileAge != null && Math.signum(maximumFileAge) < 0) {
+            throw new IllegalArgumentException("The property maximumFileAge must be non-negative");
+        }
+
+        if (minimumSize != null && Math.signum(minimumSize) < 0) {
+            throw new IllegalArgumentException("The property minimumSize must be non-negative");
+        }
+
+        if (maximumSize != null && Math.signum(maximumSize) < 0) {
+            throw new IllegalArgumentException("The property maximumSize must be non-negative");
+        }
+
+        if (pollingInterval != null && pollingInterval <= 0) {
+            throw new IllegalArgumentException("The property pollingInterval must be greater than zero");
+        }
+
+        if (numWorkers != null && numWorkers <= 0) {
+            throw new IllegalArgumentException("The property numWorkers must be greater than zero");
+        }
+    }
+}
\ No newline at end of file
diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/ProcessedFileThread.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/ProcessedFileThread.java
new file mode 100644
index 0000000000..65153e319b
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/ProcessedFileThread.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Worker thread that cleans up all the files that have been processed.
+ */
+public class ProcessedFileThread extends Thread {
+
+    private final BlockingQueue<File> recentlyProcessed;
+    private final boolean keepOriginal;
+
+    public ProcessedFileThread(FileSourceConfig fileConfig, BlockingQueue<File> recentlyProcessed) {
+        keepOriginal = Optional.ofNullable(fileConfig.getKeepFile()).orElse(false);
+        this.recentlyProcessed = recentlyProcessed;
+    }
+
+    public void run() {
+        try {
+            while (true) {
+                File file = recentlyProcessed.take();
+                handle(file);
+            }
+        } catch (InterruptedException ie) {
+            // just terminate
+        }
+    }
+
+    private void handle(File f) {
+        if (!keepOriginal) {
+            try {
+                Files.deleteIfExists(f.toPath());
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}
diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/package-info.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/package-info.java
new file mode 100644
index 0000000000..0795343979
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
\ No newline at end of file
diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/GZipFiles.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/GZipFiles.java
new file mode 100644
index 0000000000..aa38356592
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/GZipFiles.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file.utils;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PushbackInputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.stream.Stream;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.io.IOUtils;
+
+/**
+ * Helper class that provides helper methods for working with
+ * gzip-formatted files.
+ */
+public class GZipFiles {
+
+    /**
+     * Returns true if the given file is a gzip file.
+     */
+   @SuppressWarnings("deprecation")
+   public static boolean isGzip(File f) {
+
+       InputStream input = null;
+        try {
+            input = new FileInputStream(f);
+            PushbackInputStream pb = new PushbackInputStream(input, 2);
+            byte [] signature = new byte[2];
+            int len = pb.read(signature); //read the signature
+            pb.unread(signature, 0, len); //push back the signature to the stream
+            // check if matches standard gzip magic number
+            return (signature[ 0 ] == (byte) 0x1f && signature[1] == (byte) 0x8b);
+        } catch (final Exception e) {
+            return false;
+        } finally {
+            IOUtils.closeQuietly(input);
+        }
+    }
+
+    /**
+     * Get a lazily loaded stream of lines from a gzipped file, similar to
+     * {@link Files#lines(java.nio.file.Path)}.
+     *
+     * @param path
+     *          The path to the gzipped file.
+     * @return stream with lines.
+     */
+    public static Stream<String> lines(Path path) {
+      GZIPInputStream gzipStream = null;
+
+      try {
+        gzipStream = new GZIPInputStream(Files.newInputStream(path));
+      } catch (IOException e) {
+        closeSafely(gzipStream);
+        throw new UncheckedIOException(e);
+      }
+
+      BufferedReader reader = new BufferedReader(new InputStreamReader(gzipStream));
+      return reader.lines().onClose(() -> closeSafely(reader));
+    }
+
+    private static void closeSafely(Closeable closeable) {
+      if (closeable != null) {
+        try {
+          closeable.close();
+        } catch (IOException e) {
+          // Ignore
+        }
+      }
+    }
+}
diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/ZipFiles.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/ZipFiles.java
new file mode 100644
index 0000000000..09cd0c23da
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/ZipFiles.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file.utils;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.stream.Stream;
+import java.util.zip.ZipInputStream;
+
+import org.apache.commons.io.IOUtils;
+
+/**
+ * Helper class that provides helper methods for working with
+ * zip-formatted files.
+ */
+public class ZipFiles {
+
+    /**
+     * Returns true if the given file is a gzip file.
+     */
+   @SuppressWarnings("deprecation")
+   public static boolean isZip(File f) {
+
+       InputStream input = null;
+        try {
+            DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(f)));
+            int test = in.readInt();
+            in.close();
+            return test == 0x504b0304;
+        } catch (final Exception e) {
+            return false;
+        } finally {
+            IOUtils.closeQuietly(input);
+        }
+    }
+
+    /**
+     * Get a lazily loaded stream of lines from a gzipped file, similar to
+     * {@link Files#lines(java.nio.file.Path)}.
+     *
+     * @param path
+     *          The path to the zipped file.
+     * @return stream with lines.
+     */
+    public static Stream<String> lines(Path path) {
+        ZipInputStream zipStream = null;
+
+        try {
+          zipStream = new ZipInputStream(Files.newInputStream(path));
+        } catch (IOException e) {
+          closeSafely(zipStream);
+          throw new UncheckedIOException(e);
+        }
+        // Reader decoder = new InputStreamReader(gzipStream, Charset.defaultCharset());
+        BufferedReader reader = new BufferedReader(new InputStreamReader(zipStream));
+        return reader.lines().onClose(() -> closeSafely(reader));
+    }
+
+    private static void closeSafely(Closeable closeable) {
+        if (closeable != null) {
+          try {
+            closeable.close();
+          } catch (IOException e) {
+            // Ignore
+          }
+        }
+    }
+}
diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/package-info.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/package-info.java
new file mode 100644
index 0000000000..2b21e913bd
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file.utils;
\ No newline at end of file
diff --git a/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000000..df455c27ce
--- /dev/null
+++ b/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: file
+description: Reads data from local filesystem
+sourceClass: org.apache.pulsar.io.file.FileSource
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java
new file mode 100644
index 0000000000..593415136b
--- /dev/null
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+
+public abstract class AbstractFileTests {
+
+    public static final String TMP_DIR = "/tmp/foo";
+    
+    protected BlockingQueue<File> workQueue;
+    protected BlockingQueue<File> inProcess;
+    protected BlockingQueue<File> recentlyProcessed;
+    protected BlockingQueue<File> producedFiles;
+    
+    protected TestFileGenerator generatorThread; 
+    protected FileListingThread listingThread;
+    protected ExecutorService executor;
+    
+    @BeforeMethod
+    public void init() throws IOException {
+        
+        // Create the directory we are going to read from
+        Path directory = Paths.get(TMP_DIR);
+        
+        if (!Files.exists(directory, LinkOption.NOFOLLOW_LINKS)) {
+            Files.createDirectory(directory, getPermissions());
+        }
+        
+        workQueue = Mockito.spy(new LinkedBlockingQueue<>());
+        inProcess = Mockito.spy(new LinkedBlockingQueue<>());         
+        recentlyProcessed = Mockito.spy(new LinkedBlockingQueue<>());
+        producedFiles = Mockito.spy(new LinkedBlockingQueue<>());
+        executor = Executors.newFixedThreadPool(10);
+    }
+    
+    @AfterMethod
+    public void tearDown() throws Exception {
+        // Shutdown all of the processing threads
+        stopThreads();
+        
+        // Delete the directory and all the files
+        cleanUp();
+    }
+    
+    protected static final void cleanUp() throws IOException {
+        Path directory = Paths.get(TMP_DIR);
+        
+        if (!Files.exists(directory, LinkOption.NOFOLLOW_LINKS)) {
+            return;
+        }
+        
+        Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
+           @Override
+           public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+               Files.delete(file);
+               return FileVisitResult.CONTINUE;
+           }
+
+           @Override
+           public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+               Files.delete(dir);
+               return FileVisitResult.CONTINUE;
+           }
+        });
+    }
+    
+    protected void stopThreads() throws Exception {
+        executor.shutdown();
+        try {
+            if (!executor.awaitTermination(800, TimeUnit.MILLISECONDS)) {
+                executor.shutdownNow();
+            } 
+        } catch (InterruptedException e) {
+            executor.shutdownNow();
+        }
+    }
+    
+    protected final void generateFiles(int numFiles) throws IOException, InterruptedException, ExecutionException {
+        generateFiles(numFiles, 1, TMP_DIR);
+    }
+    
+    protected final void generateFiles(int numFiles, int numLines) throws IOException, InterruptedException, ExecutionException {
+        generateFiles(numFiles, numLines, TMP_DIR);
+    }
+    
+    protected final void generateFiles(int numFiles, int numLines, String directory) throws IOException, InterruptedException, ExecutionException {
+        generatorThread = new TestFileGenerator(producedFiles, numFiles, 1, numLines, directory, "prefix", ".txt", getPermissions());
+        Future<?> f = executor.submit(generatorThread);
+        f.get();
+    }
+   
+    protected static final FileAttribute<Set<PosixFilePermission>> getPermissions() {
+        Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxrwxrwx");
+        return PosixFilePermissions.asFileAttribute(perms);
+    }
+    
+}
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
new file mode 100644
index 0000000000..a21633dac9
--- /dev/null
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.PushSource;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("unchecked")
+public class FileConsumerThreadTests extends AbstractFileTests {
+    
+    private PushSource<byte[]> consumer;
+    private FileConsumerThread consumerThread;
+
+    @Test
+    public final void singleFileTest() throws IOException {
+        
+        consumer = Mockito.mock(PushSource.class);
+        Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        
+        try {
+            generateFiles(1);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
+            consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            executor.execute(consumerThread);
+            Thread.sleep(2000);
+            
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+                verify(inProcess, times(1)).add(produced);
+                verify(inProcess, times(1)).remove(produced);
+                verify(recentlyProcessed, times(1)).add(produced);
+            }
+            
+            verify(workQueue, times(1)).offer(any(File.class));
+            verify(workQueue, atLeast(1)).take();
+            verify(inProcess, times(1)).add(any(File.class));
+            verify(inProcess, times(1)).remove(any(File.class));
+            verify(recentlyProcessed, times(1)).add(any(File.class));
+            verify(consumer, times(1)).consume((Record<byte[]>) any(Record.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void mulitpleFileTest() throws IOException {
+        
+        consumer = Mockito.mock(PushSource.class);
+        Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        
+        try {
+            generateFiles(50, 2);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
+            consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            executor.execute(consumerThread);
+            Thread.sleep(2000);
+            
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+                verify(inProcess, times(1)).add(produced);
+                verify(inProcess, times(1)).remove(produced);
+                verify(recentlyProcessed, times(1)).add(produced);
+            }
+            
+            verify(workQueue, times(50)).offer(any(File.class));
+            verify(workQueue, atLeast(50)).take();
+            verify(inProcess, times(50)).add(any(File.class));
+            verify(inProcess, times(50)).remove(any(File.class));
+            verify(recentlyProcessed, times(50)).add(any(File.class));
+            verify(consumer, times(100)).consume((Record<byte[]>) any(Record.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void multiLineFileTest() throws IOException {
+        
+        consumer = Mockito.mock(PushSource.class);
+        Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        
+        try {
+            generateFiles(1, 10);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
+            consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            executor.execute(consumerThread);
+            Thread.sleep(2000);
+            
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+                verify(inProcess, times(1)).add(produced);
+                verify(inProcess, times(1)).remove(produced);
+                verify(recentlyProcessed, times(1)).add(produced);
+            }
+            
+            verify(workQueue, times(1)).offer(any(File.class));
+            verify(workQueue, atLeast(1)).take();
+            verify(inProcess, times(1)).add(any(File.class));
+            verify(inProcess, times(1)).remove(any(File.class));
+            verify(recentlyProcessed, times(1)).add(any(File.class));
+            verify(consumer, times(10)).consume((Record<byte[]>) any(Record.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+}
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
new file mode 100644
index 0000000000..855498ea13
--- /dev/null
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import static org.testng.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.testng.annotations.Test;
+
+
+public class FileListingThreadTests extends AbstractFileTests {
+     
+    @Test
+    public final void singleFileTest() throws IOException {  
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+      
+        try {
+            generateFiles(1);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(producedFiles, times(1)).put(any(File.class));
+            verify(workQueue, times(1)).offer(any(File.class));
+            
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+            }
+            
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void fiftyFileTest() throws IOException {
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        
+        try {
+            generateFiles(50);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(workQueue, times(50)).offer(any(File.class));
+            
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+            }
+            
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void minimumSizeTest() throws IOException {
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        
+        try {
+            // Create 50 zero size files
+            generateFiles(50, 0);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(workQueue, times(0)).offer(any(File.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void maximumSizeTest() throws IOException {
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("maximumSize", "1000");
+        
+        try {
+            // Create 5 files that exceed the limit and 45 that don't
+            generateFiles(5, 1000);
+            generateFiles(45, 10);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(workQueue, times(45)).offer(any(File.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } finally {
+            cleanUp();
+        }
+    }
+    
+    @Test
+    public final void minimumAgeTest() throws IOException {
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("minimumFileAge", "5000");
+        
+        try {
+            // Create 5 files that will be too "new" for processing
+            generateFiles(5);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(workQueue, times(0)).offer(any(File.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } finally {
+            cleanUp();
+        }
+    }
+    
+    @Test
+    public final void maximumAgeTest() throws IOException {
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("maximumFileAge", "5000");
+        
+        try {
+            // Create 5 files that will be processed
+            generateFiles(5);
+            Thread.sleep(5000);
+            
+            // Create 5 files that will be too "old" for processing
+            generateFiles(5);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(workQueue, times(5)).offer(any(File.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } finally {
+            cleanUp();
+        }
+    }
+    
+    @Test
+    public final void doRecurseTest() throws IOException {
+       
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("recurse", Boolean.TRUE);
+        
+        try {
+            // Create 5 files in the root folder
+            generateFiles(5);
+            
+            // Create 5 files in a sub-folder
+            generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir");
+            listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(workQueue, times(10)).offer(any(File.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } finally {
+            cleanUp();
+        }       
+    }
+    
+    @Test
+    public final void doNotRecurseTest() throws IOException {
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("recurse", Boolean.FALSE);
+        
+        try {
+            // Create 5 files in the root folder
+            generateFiles(5);
+            
+            // Create 5 files in a sub-folder
+            generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir");
+            listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(workQueue, times(5)).offer(any(File.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } finally {
+            cleanUp();
+        }    
+    }
+    
+    @Test
+    public final void pathFilterTest() throws IOException {
+         
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("recurse", Boolean.TRUE);
+        map.put("pathFilter", "sub-.*");
+        
+        try {
+            // Create 5 files in a sub-folder
+            generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir-a");
+            generateFiles(5, 1, TMP_DIR + File.separator + "dir-b");
+            listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(workQueue, times(5)).offer(any(File.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } finally {
+            cleanUp();
+        }       
+    }
+}
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
new file mode 100644
index 0000000000..64144e667a
--- /dev/null
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import static org.testng.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.testng.annotations.Test;
+
+public class FileSourceConfigTests {
+
+    @Test
+    public final void loadFromYamlFileTest() throws IOException {
+        File yamlFile = getFile("sinkConfig.yaml");
+        FileSourceConfig config = FileSourceConfig.load(yamlFile.getAbsolutePath());
+        assertNotNull(config);
+    }
+    
+    @Test
+    public final void loadFromMapTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", "/tmp");
+        map.put("keepFile", false);
+        
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+    }
+    
+    @Test
+    public final void validValidateTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", "/tmp");
+        
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+        config.validate();
+    }
+    
+    @Test(expectedExceptions = IllegalArgumentException.class, 
+            expectedExceptionsMessageRegExp = "Required property not set.")
+    public final void missingRequiredPropertiesTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("pathFilter", "/");
+        
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+        config.validate();
+    }
+    
+    @Test(expectedExceptions = com.fasterxml.jackson.databind.exc.InvalidFormatException.class)
+    public final void InvalidBooleanPropertyTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", "/");
+        map.put("recurse", "not a boolean");
+        
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+        config.validate();
+    }
+    
+    @Test(expectedExceptions = IllegalArgumentException.class, 
+            expectedExceptionsMessageRegExp = "The property pollingInterval must be greater than zero")
+    public final void ZeroValueTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", "/");
+        map.put("pollingInterval", 0);
+        
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+        config.validate();
+    }
+    
+    @Test(expectedExceptions = IllegalArgumentException.class, 
+            expectedExceptionsMessageRegExp = "The property minimumFileAge must be non-negative")
+    public final void NegativeValueTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", "/");
+        map.put("minimumFileAge", "-50");
+        
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+        config.validate();
+    }
+    
+    @Test(expectedExceptions = IllegalArgumentException.class, 
+            expectedExceptionsMessageRegExp = "Invalid Regex pattern provided for fileFilter")
+    public final void invalidFileFilterTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", "/");
+        map.put("fileFilter", "\\");  // Results in a single '\' being sent.
+        
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+        config.validate();
+    }
+    
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+}
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
new file mode 100644
index 0000000000..98d35a914a
--- /dev/null
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.PushSource;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("unchecked")
+public class ProcessedFileThreadTests extends AbstractFileTests {
+
+    private PushSource<byte[]> consumer;
+    private FileConsumerThread consumerThread;
+    private ProcessedFileThread cleanupThread;
+    private FileSourceConfig fileConfig;
+
+    @Test
+    public final void singleFileTest() throws IOException {
+        
+        consumer = Mockito.mock(PushSource.class);
+        Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("keepFile", Boolean.FALSE);
+        
+        try {
+            generateFiles(1);
+            fileConfig = FileSourceConfig.load(map);
+            listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed);
+            consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
+            cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed);
+            executor.execute(listingThread);
+            executor.execute(consumerThread);
+            executor.execute(cleanupThread);
+            Thread.sleep(2000);
+            
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+                verify(inProcess, times(1)).add(produced);
+                verify(inProcess, times(1)).remove(produced);
+                verify(recentlyProcessed, times(1)).add(produced);
+            }
+            
+            verify(workQueue, times(1)).offer(any(File.class));
+            verify(workQueue, atLeast(1)).take();
+            verify(inProcess, times(1)).add(any(File.class));
+            verify(inProcess, times(1)).remove(any(File.class));
+            verify(recentlyProcessed, times(1)).add(any(File.class));
+            verify(recentlyProcessed, times(2)).take(); 
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void mulitpleFileTest() throws IOException {
+        
+        consumer = Mockito.mock(PushSource.class);
+        Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
+       
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("keepFile", Boolean.FALSE);
+        
+        try {
+            generateFiles(50);
+            fileConfig = FileSourceConfig.load(map);
+            listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed);
+            consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
+            cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed);
+            executor.execute(listingThread);
+            executor.execute(consumerThread);
+            executor.execute(cleanupThread);
+            Thread.sleep(2000);
+            
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+                verify(inProcess, times(1)).add(produced);
+                verify(inProcess, times(1)).remove(produced);
+                verify(recentlyProcessed, times(1)).add(produced);
+            }
+            
+            verify(workQueue, times(50)).offer(any(File.class));
+            verify(workQueue, atLeast(50)).take();
+            verify(inProcess, times(50)).add(any(File.class));
+            verify(inProcess, times(50)).remove(any(File.class));
+            verify(recentlyProcessed, times(50)).add(any(File.class));
+            verify(recentlyProcessed, times(50)).add(any(File.class));
+            verify(recentlyProcessed, times(51)).take(); 
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void keepFileTest() throws IOException {
+        
+        consumer = Mockito.mock(PushSource.class);
+        Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
+       
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("keepFile", Boolean.TRUE);
+        map.put("pollingInterval", 1000L);
+        
+        try {
+            generateFiles(1);
+            fileConfig = FileSourceConfig.load(map);
+            listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed);
+            consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
+            cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed);
+            executor.execute(listingThread);
+            executor.execute(consumerThread);
+            executor.execute(cleanupThread);
+            Thread.sleep(7900);  // Should pull the same file 5 times?
+            
+            for (File produced : producedFiles) {
+                verify(workQueue, atLeast(4)).offer(produced);
+                verify(inProcess, atLeast(4)).add(produced);
+                verify(inProcess, atLeast(4)).remove(produced);
+                verify(recentlyProcessed, atLeast(4)).add(produced);
+            }
+            
+            verify(recentlyProcessed, atLeast(5)).take(); 
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void continuousRunTest() throws IOException {
+        
+        consumer = Mockito.mock(PushSource.class);
+        Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
+       
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("keepFile", Boolean.FALSE);
+        map.put("pollingInterval", 100);
+        fileConfig = FileSourceConfig.load(map);
+        
+        try {
+            // Start producing files, with a .1 sec delay between
+            generatorThread = new TestFileGenerator(producedFiles, 5000, 100, 1, TMP_DIR, "continuous", ".txt", getPermissions());
+            executor.execute(generatorThread);
+            
+            listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed);
+            consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
+            cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed);
+            executor.execute(listingThread);
+            executor.execute(consumerThread);
+            executor.execute(cleanupThread);
+            
+            // Run for 30 seconds
+            Thread.sleep(30000);
+            
+            // Stop producing files
+            generatorThread.halt();
+            
+            // Let the consumer catch up
+            while (!workQueue.isEmpty() && !inProcess.isEmpty() && !recentlyProcessed.isEmpty()) {
+                Thread.sleep(2000);
+            }
+            
+            // Make sure every single file was processed.
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+                verify(inProcess, times(1)).add(produced);
+                verify(inProcess, times(1)).remove(produced);
+                verify(recentlyProcessed, times(1)).add(produced);
+            }
+            
+        } catch (InterruptedException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void multipleConsumerTest() throws IOException {
+        
+        consumer = Mockito.mock(PushSource.class);
+        Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
+       
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("keepFile", Boolean.FALSE);
+        map.put("pollingInterval", 100);
+        fileConfig = FileSourceConfig.load(map);
+        
+        try {
+            // Start producing files, with a .1 sec delay between
+            generatorThread = new TestFileGenerator(producedFiles, 5000, 100, 1, TMP_DIR, "continuous", ".txt", getPermissions());
+            executor.execute(generatorThread);
+            
+            listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed);
+            consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
+            FileConsumerThread consumerThread2 = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
+            FileConsumerThread consumerThread3 = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
+            cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed);
+            executor.execute(listingThread);
+            executor.execute(consumerThread);
+            executor.execute(consumerThread2);
+            executor.execute(consumerThread3);
+            executor.execute(cleanupThread);
+            
+            // Run for 30 seconds
+            Thread.sleep(30000);
+            
+            // Stop producing files
+            generatorThread.halt();
+            
+            // Let the consumer catch up
+            while (!workQueue.isEmpty() && !inProcess.isEmpty() && !recentlyProcessed.isEmpty()) {
+                Thread.sleep(2000);
+            }
+            
+            // Make sure every single file was processed exactly once.
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+                verify(inProcess, times(1)).add(produced);
+                verify(inProcess, times(1)).remove(produced);
+                verify(recentlyProcessed, times(1)).add(produced);
+            }
+            
+        } catch (InterruptedException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+}
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/TestFileGenerator.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/TestFileGenerator.java
new file mode 100644
index 0000000000..21ad85279a
--- /dev/null
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/TestFileGenerator.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.FileAttribute;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+
+public class TestFileGenerator extends Thread {
+
+    // Allows us to communicate back which files we generated
+    private final BlockingQueue<File> producedFiles;
+    private final int numFiles;
+    private final long delay;
+    private final int numLines;
+    private final String prefix;
+    private final String suffix;
+    private final FileAttribute<?>[] attrs;
+    private final Path tempDir;
+    private boolean keepRunning = true;
+    
+    public TestFileGenerator(BlockingQueue<File> producedFiles, int numFiles, long delay, int numLines, 
+            String dir, String prefix, String suffix, FileAttribute<?>... attrs) throws IOException {
+        this.numFiles = numFiles;
+        this.delay = delay;
+        this.numLines = numLines;
+        this.producedFiles = producedFiles;
+        this.prefix = prefix;
+        this.suffix = suffix;
+        this.attrs = attrs;
+        tempDir = Files.createDirectories(Paths.get(dir), attrs);
+    }
+    
+    public void run() {
+        int counter = 0;
+        while  ( keepRunning && (counter++ < numFiles)) {
+            createFile();
+            try {
+                sleep(delay);
+            } catch (InterruptedException e) {
+                return;
+            }
+        }
+    }
+    
+    public void halt() {
+        keepRunning = false;
+    }
+    
+    private final void createFile() {
+        try {
+            Path path = Files.createTempFile(tempDir, prefix, suffix, attrs);
+            try(OutputStream out = Files.newOutputStream(path, StandardOpenOption.APPEND)) {
+              for (int idx = 0; idx < numLines; idx++) {
+                 IOUtils.write(RandomStringUtils.random(50, true, false) + "\n", out, "UTF-8");
+              }
+            }
+            
+            producedFiles.put(path.toFile());
+            
+        } catch (IOException | InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+}
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/GZipFilesTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/GZipFilesTests.java
new file mode 100644
index 0000000000..b08bb0086a
--- /dev/null
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/GZipFilesTests.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file.utils;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.stream.Stream;
+
+import org.testng.annotations.Test;
+
+public class GZipFilesTests {
+
+    @Test
+    public final void validGzipFileTest() {
+        assertTrue(GZipFiles.isGzip(getFile("org/apache/pulsar/io/file/validGzip.gz")));
+    }
+    
+    @Test
+    public final void nonGzipFileTest() {
+        assertFalse(GZipFiles.isGzip(getFile("org/apache/pulsar/io/file/nonGzipFile.txt")));
+    }
+    
+    @Test
+    public final void mislabelledGzipFileTest() {
+        assertFalse(GZipFiles.isGzip(getFile("org/apache/pulsar/io/file/mislabelled.gz")));
+    }
+    
+    @Test
+    public final void nonExistantGzipFileTest() {
+        assertFalse(GZipFiles.isGzip(null));
+    }
+    
+    @Test
+    public final void streamGzipFileTest() {
+        Path path = Paths.get(getFile("org/apache/pulsar/io/file/validGzip.gz").getAbsolutePath(), "");
+        
+        try (Stream<String> lines = GZipFiles.lines(path)) {
+            lines.forEachOrdered(line -> assertTrue(line.startsWith("Line ")));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+    
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+}
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/ZipFilesTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/ZipFilesTests.java
new file mode 100644
index 0000000000..2fc7286b3a
--- /dev/null
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/ZipFilesTests.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file.utils;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.stream.Stream;
+
+import org.testng.annotations.Test;
+
+public class ZipFilesTests {
+
+    @Test
+    public final void validZipFileTest() {
+        assertTrue(ZipFiles.isZip(getFile("org/apache/pulsar/io/file/validZip.zip")));
+    }
+    
+    @Test
+    public final void nonZipFileTest() {
+        assertFalse(ZipFiles.isZip(getFile("org/apache/pulsar/io/file/nonGzipFile.txt")));
+    }
+    
+    @Test
+    public final void mislabelledZipFileTest() {
+        assertFalse(ZipFiles.isZip(getFile("org/apache/pulsar/io/file/mislabelled.gz")));
+    }
+    
+    @Test
+    public final void nonExistantGzipFileTest() {
+        assertFalse(ZipFiles.isZip(null));
+    }
+    
+    @Test
+    public final void streamZipFileTest() {
+        Path path = Paths.get(getFile("org/apache/pulsar/io/file/validZip.zip").getAbsolutePath(), "");
+        
+        try (Stream<String> lines = ZipFiles.lines(path)) {
+            lines.forEachOrdered(line -> assertTrue(line.startsWith("Line ")));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+    
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+}
diff --git a/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/mislabelled.gz b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/mislabelled.gz
new file mode 100644
index 0000000000..529587e6df
--- /dev/null
+++ b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/mislabelled.gz
@@ -0,0 +1 @@
+This file isn't gzipped.
\ No newline at end of file
diff --git a/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/nonGzipFile.txt b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/nonGzipFile.txt
new file mode 100644
index 0000000000..fbd35c6aea
--- /dev/null
+++ b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/nonGzipFile.txt
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+This file is not gzipped
\ No newline at end of file
diff --git a/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validGzip.gz b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validGzip.gz
new file mode 100644
index 0000000000..f7d098d7f6
Binary files /dev/null and b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validGzip.gz differ
diff --git a/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validZip.zip b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validZip.zip
new file mode 100644
index 0000000000..55c28e3627
Binary files /dev/null and b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validZip.zip differ
diff --git a/pulsar-io/file/src/test/resources/sinkConfig.yaml b/pulsar-io/file/src/test/resources/sinkConfig.yaml
new file mode 100644
index 0000000000..554b0f9312
--- /dev/null
+++ b/pulsar-io/file/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+   "inputDirectory": "/Users/david",
+   "recurse": true,
+   "keepFile": true,
+   "fileFilter": "[^\\.].*",
+   "pathFilter": "*",
+   "minimumFileAge": 0,
+   "maximumFileAge": 9999999999,
+   "minimumSize": 1,
+   "maximumSize": 5000000,
+   "ignoreHiddenFiles": true,
+   "pollingInterval": 5000,
+   "numWorkers": 1
+}
\ No newline at end of file
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 6503b66dec..628686b558 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -48,6 +48,7 @@
     <module>debezium</module>
     <module>hdfs2</module>
     <module>canal</module>
+    <module>file</module>
     <module>netty</module>
   </modules>
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services