You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/22 18:04:25 UTC

[36/79] [abbrv] [partial] incubator-nifi git commit: NIFI-270 Made all changes identified by adam, mark, joey to prep for a cleaner build

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
deleted file mode 100644
index 41a0557..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
+++ /dev/null
@@ -1,623 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.file;
-
-import java.io.BufferedInputStream;
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
-
-import org.slf4j.Logger;
-
-/**
- * A utility class containing a few useful static methods to do typical IO
- * operations.
- *
- * @author unattributed
- */
-public class FileUtils {
-
-    public static final long TRANSFER_CHUNK_SIZE_BYTES = 1024 * 1024 * 8; //8 MB chunks
-    public static final long MILLIS_BETWEEN_ATTEMPTS = 50L;
-
-    /**
-     * Closes the given closeable quietly - no logging, no exceptions...
-     *
-     * @param closeable
-     */
-    public static void closeQuietly(final Closeable closeable) {
-        if (null != closeable) {
-            try {
-                closeable.close();
-            } catch (final IOException io) {/*IGNORE*/
-
-            }
-        }
-    }
-
-    /**
-     * Releases the given lock quietly - no logging, no exception
-     *
-     * @param lock
-     */
-    public static void releaseQuietly(final FileLock lock) {
-        if (null != lock) {
-            try {
-                lock.release();
-            } catch (final IOException io) {
-                /*IGNORE*/
-            }
-        }
-    }
-
-    public static void ensureDirectoryExistAndCanAccess(final File dir) throws IOException {
-        if (dir.exists() && !dir.isDirectory()) {
-            throw new IOException(dir.getAbsolutePath() + " is not a directory");
-        } else if (!dir.exists()) {
-            final boolean made = dir.mkdirs();
-            if (!made) {
-                throw new IOException(dir.getAbsolutePath() + " could not be created");
-            }
-        }
-        if (!(dir.canRead() && dir.canWrite())) {
-            throw new IOException(dir.getAbsolutePath() + " directory does not have read/write privilege");
-        }
-    }
-
-    /**
-     * Deletes the given file. If the given file exists but could not be deleted
-     * this will be printed as a warning to the given logger
-     *
-     * @param file
-     * @param logger
-     * @return
-     */
-    public static boolean deleteFile(final File file, final Logger logger) {
-        return FileUtils.deleteFile(file, logger, 1);
-    }
-
-    /**
-     * Deletes the given file. If the given file exists but could not be deleted
-     * this will be printed as a warning to the given logger
-     *
-     * @param file
-     * @param logger
-     * @param attempts indicates how many times an attempt to delete should be
-     * made
-     * @return true if given file no longer exists
-     */
-    public static boolean deleteFile(final File file, final Logger logger, final int attempts) {
-        if (file == null) {
-            return false;
-        }
-        boolean isGone = false;
-        try {
-            if (file.exists()) {
-                final int effectiveAttempts = Math.max(1, attempts);
-                for (int i = 0; i < effectiveAttempts && !isGone; i++) {
-                    isGone = file.delete() || !file.exists();
-                    if (!isGone && (effectiveAttempts - i) > 1) {
-                        FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
-                    }
-                }
-                if (!isGone && logger != null) {
-                    logger.warn("File appears to exist but unable to delete file: " + file.getAbsolutePath());
-                }
-            }
-        } catch (final Throwable t) {
-            if (logger != null) {
-                logger.warn("Unable to delete file: '" + file.getAbsolutePath() + "' due to " + t);
-            }
-        }
-        return isGone;
-    }
-
-    /**
-     * Deletes all of the given files. If any exist and cannot be deleted that
-     * will be printed at warn to the given logger.
-     *
-     * @param files can be null
-     * @param logger can be null
-     */
-    public static void deleteFile(final List<File> files, final Logger logger) {
-        FileUtils.deleteFile(files, logger, 1);
-    }
-
-    /**
-     * Deletes all of the given files. If any exist and cannot be deleted that
-     * will be printed at warn to the given logger.
-     *
-     * @param files can be null
-     * @param logger can be null
-     * @param attempts indicates how many times an attempt should be made to
-     * delete each file
-     */
-    public static void deleteFile(final List<File> files, final Logger logger, final int attempts) {
-        if (null == files || files.isEmpty()) {
-            return;
-        }
-        final int effectiveAttempts = Math.max(1, attempts);
-        for (final File file : files) {
-            try {
-                boolean isGone = false;
-                for (int i = 0; i < effectiveAttempts && !isGone; i++) {
-                    isGone = file.delete() || !file.exists();
-                    if (!isGone && (effectiveAttempts - i) > 1) {
-                        FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
-                    }
-                }
-                if (!isGone && logger != null) {
-                    logger.warn("File appears to exist but unable to delete file: " + file.getAbsolutePath());
-                }
-            } catch (final Throwable t) {
-                if (null != logger) {
-                    logger.warn("Unable to delete file given from path: '" + file.getPath() + "' due to " + t);
-                }
-            }
-        }
-    }
-
-    /**
-     * Deletes all files (not directories..) in the given directory (non
-     * recursive) that match the given filename filter. If any file cannot be
-     * deleted then this is printed at warn to the given logger.
-     *
-     * @param directory
-     * @param filter if null then no filter is used
-     * @param logger
-     */
-    public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger) {
-        FileUtils.deleteFilesInDir(directory, filter, logger, false);
-    }
-
-    /**
-     * Deletes all files (not directories) in the given directory (recursive)
-     * that match the given filename filter. If any file cannot be deleted then
-     * this is printed at warn to the given logger.
-     *
-     * @param directory
-     * @param filter if null then no filter is used
-     * @param logger
-     * @param recurse
-     */
-    public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse) {
-        FileUtils.deleteFilesInDir(directory, filter, logger, recurse, false);
-    }
-
-    /**
-     * Deletes all files (not directories) in the given directory (recursive)
-     * that match the given filename filter. If any file cannot be deleted then
-     * this is printed at warn to the given logger.
-     *
-     * @param directory
-     * @param filter if null then no filter is used
-     * @param logger
-     * @param recurse
-     * @param deleteEmptyDirectories default is false; if true will delete
-     * directories found that are empty
-     */
-    public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse, final boolean deleteEmptyDirectories) {
-        // ensure the specified directory is actually a directory and that it exists
-        if (null != directory && directory.isDirectory()) {
-            final File ingestFiles[] = directory.listFiles();
-            for (File ingestFile : ingestFiles) {
-                boolean process = (filter == null) ? true : filter.accept(directory, ingestFile.getName());
-                if (ingestFile.isFile() && process) {
-                    FileUtils.deleteFile(ingestFile, logger, 3);
-                }
-                if (ingestFile.isDirectory() && recurse) {
-                    FileUtils.deleteFilesInDir(ingestFile, filter, logger, recurse, deleteEmptyDirectories);
-                    if (deleteEmptyDirectories && ingestFile.list().length == 0) {
-                        FileUtils.deleteFile(ingestFile, logger, 3);
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Deletes given files.
-     *
-     * @param files
-     * @param recurse will recurse
-     * @throws IOException
-     */
-    public static void deleteFiles(final Collection<File> files, final boolean recurse) throws IOException {
-        for (final File file : files) {
-            FileUtils.deleteFile(file, recurse);
-        }
-    }
-
-    public static void deleteFile(final File file, final boolean recurse) throws IOException {
-        if (file.isDirectory() && recurse) {
-            FileUtils.deleteFiles(Arrays.asList(file.listFiles()), recurse);
-        }
-        //now delete the file itself regardless of whether it is plain file or a directory
-        if (!FileUtils.deleteFile(file, null, 5)) {
-            throw new IOException("Unable to delete " + file.getAbsolutePath());
-        }
-    }
-
-    /**
-     * Randomly generates a sequence of bytes and overwrites the contents of the
-     * file a number of times. The file is then deleted.
-     *
-     * @param file File to be overwritten a number of times and, ultimately,
-     * deleted
-     * @param passes Number of times file should be overwritten
-     * @throws IOException if something makes shredding or deleting a problem
-     */
-    public static void shredFile(final File file, final int passes)
-            throws IOException {
-        final Random generator = new Random();
-        final long fileLength = file.length();
-        final int byteArraySize = (int) Math.min(fileLength, 1048576); // 1MB
-        final byte[] b = new byte[byteArraySize];
-        final long numOfRandomWrites = (fileLength / b.length) + 1;
-        final FileOutputStream fos = new FileOutputStream(file);
-        try {
-            // Over write file contents (passes) times
-            final FileChannel channel = fos.getChannel();
-            for (int i = 0; i < passes; i++) {
-                generator.nextBytes(b);
-                for (int j = 0; j <= numOfRandomWrites; j++) {
-                    fos.write(b);
-                }
-                fos.flush();
-                channel.position(0);
-            }
-            // Write out "0" for each byte in the file
-            Arrays.fill(b, (byte) 0);
-            for (int j = 0; j < numOfRandomWrites; j++) {
-                fos.write(b);
-            }
-            fos.flush();
-            fos.close();
-            // Try to delete the file a few times
-            if (!FileUtils.deleteFile(file, null, 5)) {
-                throw new IOException("Failed to delete file after shredding");
-            }
-
-        } finally {
-            FileUtils.closeQuietly(fos);
-        }
-    }
-
-    public static long copy(final InputStream in, final OutputStream out) throws IOException {
-        final byte[] buffer = new byte[65536];
-        long copied = 0L;
-        int len;
-        while ((len = in.read(buffer)) > 0) {
-            out.write(buffer, 0, len);
-            copied += len;
-        }
-
-        return copied;
-    }
-
-    public static long copyBytes(final byte[] bytes, final File destination, final boolean lockOutputFile) throws FileNotFoundException, IOException {
-        FileOutputStream fos = null;
-        FileLock outLock = null;
-        long fileSize = 0L;
-        try {
-            fos = new FileOutputStream(destination);
-            final FileChannel out = fos.getChannel();
-            if (lockOutputFile) {
-                outLock = out.tryLock(0, Long.MAX_VALUE, false);
-                if (null == outLock) {
-                    throw new IOException("Unable to obtain exclusive file lock for: " + destination.getAbsolutePath());
-                }
-            }
-            fos.write(bytes);
-            fos.flush();
-            fileSize = bytes.length;
-        } finally {
-            FileUtils.releaseQuietly(outLock);
-            FileUtils.closeQuietly(fos);
-        }
-        return fileSize;
-    }
-
-    /**
-     * Copies the given source file to the given destination file. The given
-     * destination will be overwritten if it already exists.
-     *
-     * @param source
-     * @param destination
-     * @param lockInputFile if true will lock input file during copy; if false
-     * will not
-     * @param lockOutputFile if true will lock output file during copy; if false
-     * will not
-     * @param move if true will perform what is effectively a move operation
-     * rather than a pure copy. This allows for potentially highly efficient
-     * movement of the file but if not possible this will revert to a copy then
-     * delete behavior. If false, then the file is copied and the source file is
-     * retained. If a true rename/move occurs then no lock is held during that
-     * time.
-     * @param logger if failures occur, they will be logged to this logger if
-     * possible. If this logger is null, an IOException will instead be thrown,
-     * indicating the problem.
-     * @return long number of bytes copied
-     * @throws FileNotFoundException if the source file could not be found
-     * @throws IOException
-     * @throws SecurityException if a security manager denies the needed file
-     * operations
-     */
-    public static long copyFile(final File source, final File destination, final boolean lockInputFile, final boolean lockOutputFile, final boolean move, final Logger logger) throws FileNotFoundException, IOException {
-
-        FileInputStream fis = null;
-        FileOutputStream fos = null;
-        FileLock inLock = null;
-        FileLock outLock = null;
-        long fileSize = 0L;
-        if (!source.canRead()) {
-            throw new IOException("Must at least have read permission");
-
-        }
-        if (move && source.renameTo(destination)) {
-            fileSize = destination.length();
-        } else {
-            try {
-                fis = new FileInputStream(source);
-                fos = new FileOutputStream(destination);
-                final FileChannel in = fis.getChannel();
-                final FileChannel out = fos.getChannel();
-                if (lockInputFile) {
-                    inLock = in.tryLock(0, Long.MAX_VALUE, true);
-                    if (null == inLock) {
-                        throw new IOException("Unable to obtain shared file lock for: " + source.getAbsolutePath());
-                    }
-                }
-                if (lockOutputFile) {
-                    outLock = out.tryLock(0, Long.MAX_VALUE, false);
-                    if (null == outLock) {
-                        throw new IOException("Unable to obtain exclusive file lock for: " + destination.getAbsolutePath());
-                    }
-                }
-                long bytesWritten = 0;
-                do {
-                    bytesWritten += out.transferFrom(in, bytesWritten, TRANSFER_CHUNK_SIZE_BYTES);
-                    fileSize = in.size();
-                } while (bytesWritten < fileSize);
-                out.force(false);
-                FileUtils.closeQuietly(fos);
-                FileUtils.closeQuietly(fis);
-                fos = null;
-                fis = null;
-                if (move && !FileUtils.deleteFile(source, null, 5)) {
-                    if (logger == null) {
-                        FileUtils.deleteFile(destination, null, 5);
-                        throw new IOException("Could not remove file " + source.getAbsolutePath());
-                    } else {
-                        logger.warn("Configured to delete source file when renaming/move not successful.  However, unable to delete file at: " + source.getAbsolutePath());
-                    }
-                }
-            } finally {
-                FileUtils.releaseQuietly(inLock);
-                FileUtils.releaseQuietly(outLock);
-                FileUtils.closeQuietly(fos);
-                FileUtils.closeQuietly(fis);
-            }
-        }
-        return fileSize;
-    }
-
-    /**
-     * Copies the given source file to the given destination file. The given
-     * destination will be overwritten if it already exists.
-     *
-     * @param source
-     * @param destination
-     * @param lockInputFile if true will lock input file during copy; if false
-     * will not
-     * @param lockOutputFile if true will lock output file during copy; if false
-     * will not
-     * @param logger
-     * @return long number of bytes copied
-     * @throws FileNotFoundException if the source file could not be found
-     * @throws IOException
-     * @throws SecurityException if a security manager denies the needed file
-     * operations
-     */
-    public static long copyFile(final File source, final File destination, final boolean lockInputFile, final boolean lockOutputFile, final Logger logger) throws FileNotFoundException, IOException {
-        return FileUtils.copyFile(source, destination, lockInputFile, lockOutputFile, false, logger);
-    }
-
-    public static long copyFile(final File source, final OutputStream stream, final boolean closeOutputStream, final boolean lockInputFile) throws FileNotFoundException, IOException {
-        FileInputStream fis = null;
-        FileLock inLock = null;
-        long fileSize = 0L;
-        try {
-            fis = new FileInputStream(source);
-            final FileChannel in = fis.getChannel();
-            if (lockInputFile) {
-                inLock = in.tryLock(0, Long.MAX_VALUE, true);
-                if (inLock == null) {
-                    throw new IOException("Unable to obtain exclusive file lock for: " + source.getAbsolutePath());
-                }
-
-            }
-
-            byte[] buffer = new byte[1 << 18]; //256 KB
-            int bytesRead = -1;
-            while ((bytesRead = fis.read(buffer)) != -1) {
-                stream.write(buffer, 0, bytesRead);
-            }
-            in.force(false);
-            stream.flush();
-            fileSize = in.size();
-        } finally {
-            FileUtils.releaseQuietly(inLock);
-            FileUtils.closeQuietly(fis);
-            if (closeOutputStream) {
-                FileUtils.closeQuietly(stream);
-            }
-        }
-        return fileSize;
-    }
-
-    public static long copyFile(final InputStream stream, final File destination, final boolean closeInputStream, final boolean lockOutputFile) throws FileNotFoundException, IOException {
-        final Path destPath = destination.toPath();
-        final long size = Files.copy(stream, destPath);
-        if (closeInputStream) {
-            stream.close();
-        }
-        return size;
-    }
-
-    /**
-     * Renames the given file from the source path to the destination path. This
-     * handles multiple attempts. This should only be used to rename within a
-     * given directory. Renaming across directories might not work well. See the
-     * <code>File.renameTo</code> for more information.
-     *
-     * @param source the file to rename
-     * @param destination the file path to rename to
-     * @param maxAttempts the max number of attempts to attempt the rename
-     * @throws IOException if rename isn't successful
-     */
-    public static void renameFile(final File source, final File destination, final int maxAttempts) throws IOException {
-        FileUtils.renameFile(source, destination, maxAttempts, false);
-    }
-
-    /**
-     * Renames the given file from the source path to the destination path. This
-     * handles multiple attempts. This should only be used to rename within a
-     * given directory. Renaming across directories might not work well. See the
-     * <code>File.renameTo</code> for more information.
-     *
-     * @param source the file to rename
-     * @param destination the file path to rename to
-     * @param maxAttempts the max number of attempts to attempt the rename
-     * @param replace if true and a rename attempt fails will check if a file is
-     * already at the destination path. If so it will delete that file and
-     * attempt the rename according the remaining maxAttempts. If false, any
-     * conflicting files will be left as they were and the rename attempts will
-     * fail if conflicting.
-     * @throws IOException if rename isn't successful
-     */
-    public static void renameFile(final File source, final File destination, final int maxAttempts, final boolean replace) throws IOException {
-        final int attempts = (replace || maxAttempts < 1) ? Math.max(2, maxAttempts) : maxAttempts;
-        boolean renamed = false;
-        for (int i = 0; i < attempts; i++) {
-            renamed = source.renameTo(destination);
-            if (!renamed) {
-                FileUtils.deleteFile(destination, null, 5);
-            } else {
-                break; //rename has succeeded
-            }
-        }
-        if (!renamed) {
-            throw new IOException("Attempted " + maxAttempts + " times but unable to rename from \'" + source.getPath() + "\' to \'" + destination.getPath() + "\'");
-
-        }
-    }
-
-    public static void sleepQuietly(final long millis) {
-        try {
-            Thread.sleep(millis);
-        } catch (final InterruptedException ex) {
-            /* do nothing */
-        }
-    }
-
-    /**
-     * Syncs a primary copy of a file with the copy in the restore directory. If
-     * the restore directory does not have a file and the primary has a file,
-     * the the primary's file is copied to the restore directory. Else if the
-     * restore directory has a file, but the primary does not, then the
-     * restore's file is copied to the primary directory. Else if the primary
-     * file is different than the restore file, then an IllegalStateException is
-     * thrown. Otherwise, if neither file exists, then no syncing is performed.
-     *
-     * @param primaryFile the primary file
-     * @param restoreFile the restore file
-     * @param logger a logger
-     * @throws IOException if an I/O problem was encountered during syncing
-     * @throws IllegalStateException if the primary and restore copies exist but
-     * are different
-     */
-    public static void syncWithRestore(final File primaryFile, final File restoreFile, final Logger logger)
-            throws IOException {
-
-        if (primaryFile.exists() && !restoreFile.exists()) {
-            // copy primary file to restore
-            copyFile(primaryFile, restoreFile, false, false, logger);
-        } else if (restoreFile.exists() && !primaryFile.exists()) {
-            // copy restore file to primary
-            copyFile(restoreFile, primaryFile, false, false, logger);
-        } else if (primaryFile.exists() && restoreFile.exists() && !isSame(primaryFile, restoreFile)) {
-            throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'",
-                    primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath()));
-        }
-    }
-
-    /**
-     * Returns true if the given files are the same according to their MD5 hash.
-     *
-     * @param file1 a file
-     * @param file2 a file
-     * @return true if the files are the same; false otherwise
-     * @throws IOException if the MD5 hash could not be computed
-     */
-    public static boolean isSame(final File file1, final File file2) throws IOException {
-        return Arrays.equals(computeMd5Digest(file1), computeMd5Digest(file2));
-    }
-
-    /**
-     * Returns the MD5 hash of the given file.
-     *
-     * @param file a file
-     * @return the MD5 hash
-     * @throws IOException if the MD5 hash could not be computed
-     */
-    public static byte[] computeMd5Digest(final File file) throws IOException {
-        final MessageDigest digest;
-        try {
-            digest = MessageDigest.getInstance("MD5");
-        } catch (final NoSuchAlgorithmException nsae) {
-            throw new IOException(nsae);
-        }
-
-        try (final FileInputStream fis = new FileInputStream(file)) {
-            int len;
-            final byte[] buffer = new byte[8192];
-            while ((len = fis.read(buffer)) > -1) {
-                if (len > 0) {
-                    digest.update(buffer, 0, len);
-                }
-            }
-        }
-        return digest.digest();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java
deleted file mode 100644
index 6f9c616..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.file.monitor;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * An {@link UpdateMonitor} that combines multiple <code>UpdateMonitor</code>s
- * such that it will indicate a change in a file only if ALL sub-monitors
- * indicate a change. The sub-monitors will be applied in the order given and if
- * any indicates that the state has not changed, the subsequent sub-monitors may
- * not be given a chance to run
- */
-public class CompoundUpdateMonitor implements UpdateMonitor {
-
-    private final List<UpdateMonitor> monitors;
-
-    public CompoundUpdateMonitor(final UpdateMonitor first, final UpdateMonitor... others) {
-        monitors = new ArrayList<>();
-        monitors.add(first);
-        for (final UpdateMonitor monitor : others) {
-            monitors.add(monitor);
-        }
-    }
-
-    @Override
-    public Object getCurrentState(final Path path) throws IOException {
-        return new DeferredMonitorAction(monitors, path);
-    }
-
-    private static class DeferredMonitorAction {
-
-        private static final Object NON_COMPUTED_VALUE = new Object();
-
-        private final List<UpdateMonitor> monitors;
-        private final Path path;
-
-        private final Object[] preCalculated;
-
-        public DeferredMonitorAction(final List<UpdateMonitor> monitors, final Path path) {
-            this.monitors = monitors;
-            this.path = path;
-            preCalculated = new Object[monitors.size()];
-
-            for (int i = 0; i < preCalculated.length; i++) {
-                preCalculated[i] = NON_COMPUTED_VALUE;
-            }
-        }
-
-        private Object getCalculatedValue(final int i) throws IOException {
-            if (preCalculated[i] == NON_COMPUTED_VALUE) {
-                preCalculated[i] = monitors.get(i).getCurrentState(path);
-            }
-
-            return preCalculated[i];
-        }
-
-        @Override
-        public boolean equals(final Object obj) {
-            // must return true unless ALL DeferredMonitorAction's indicate that they are different
-            if (obj == null) {
-                return false;
-            }
-
-            if (!(obj instanceof DeferredMonitorAction)) {
-                return false;
-            }
-
-            final DeferredMonitorAction other = (DeferredMonitorAction) obj;
-            try {
-                // Go through each UpdateMonitor's value and check if the value has changed.
-                for (int i = 0; i < preCalculated.length; i++) {
-                    final Object mine = getCalculatedValue(i);
-                    final Object theirs = other.getCalculatedValue(i);
-
-                    if (mine == theirs) {
-                        // same
-                        return true;
-                    }
-
-                    if (mine == null && theirs == null) {
-                        // same
-                        return true;
-                    }
-
-                    if (mine.equals(theirs)) {
-                        return true;
-                    }
-                }
-            } catch (final IOException e) {
-                return false;
-            }
-
-            // No DeferredMonitorAction was the same as last time. Therefore, it's not equal
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java
deleted file mode 100644
index e6be558..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.file.monitor;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
-public class LastModifiedMonitor implements UpdateMonitor {
-
-    @Override
-    public Object getCurrentState(final Path path) throws IOException {
-        return Files.getLastModifiedTime(path);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java
deleted file mode 100644
index 8dea4bf..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.file.monitor;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Path;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-
-public class MD5SumMonitor implements UpdateMonitor {
-
-    @Override
-    public Object getCurrentState(final Path path) throws IOException {
-        final MessageDigest digest;
-        try {
-            digest = MessageDigest.getInstance("MD5");
-        } catch (final NoSuchAlgorithmException nsae) {
-            throw new AssertionError(nsae);
-        }
-
-        try (final FileInputStream fis = new FileInputStream(path.toFile())) {
-            int len;
-            final byte[] buffer = new byte[8192];
-            while ((len = fis.read(buffer)) > -1) {
-                if (len > 0) {
-                    digest.update(buffer, 0, len);
-                }
-            }
-        }
-
-        // Return a ByteBuffer instead of byte[] because we want equals() to do a deep equality
-        return ByteBuffer.wrap(digest.digest());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java
deleted file mode 100644
index e0089c1..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.file.monitor;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Allows the user to configure a {@link java.nio.file.Path Path} to watch for
- * modifications and periodically poll to check if the file has been modified
- */
-public class SynchronousFileWatcher {
-
-    private final Path path;
-    private final long checkUpdateMillis;
-    private final UpdateMonitor monitor;
-    private final AtomicReference<StateWrapper> lastState;
-    private final Lock resourceLock = new ReentrantLock();
-
-    public SynchronousFileWatcher(final Path path, final UpdateMonitor monitor) {
-        this(path, monitor, 0L);
-    }
-
-    public SynchronousFileWatcher(final Path path, final UpdateMonitor monitor, final long checkMillis) {
-        if (checkMillis < 0) {
-            throw new IllegalArgumentException();
-        }
-
-        this.path = path;
-        checkUpdateMillis = checkMillis;
-        this.monitor = monitor;
-
-        Object currentState;
-        try {
-            currentState = monitor.getCurrentState(path);
-        } catch (final IOException e) {
-            currentState = null;
-        }
-
-        this.lastState = new AtomicReference<>(new StateWrapper(currentState));
-    }
-
-    /**
-     * Checks if the file has been updated according to the configured
-     * {@link UpdateMonitor} and resets the state
-     *
-     * @return
-     * @throws IOException
-     */
-    public boolean checkAndReset() throws IOException {
-        if (checkUpdateMillis <= 0) { // if checkUpdateMillis <= 0, always check
-            return checkForUpdate();
-        } else {
-            final StateWrapper stateWrapper = lastState.get();
-            if (stateWrapper.getTimestamp() < System.currentTimeMillis() - checkUpdateMillis) {
-                return checkForUpdate();
-            }
-            return false;
-        }
-    }
-
-    private boolean checkForUpdate() throws IOException {
-        if (resourceLock.tryLock()) {
-            try {
-                final StateWrapper wrapper = lastState.get();
-                final Object newState = monitor.getCurrentState(path);
-                if (newState == null && wrapper.getState() == null) {
-                    return false;
-                }
-                if (newState == null || wrapper.getState() == null) {
-                    lastState.set(new StateWrapper(newState));
-                    return true;
-                }
-
-                final boolean unmodified = newState.equals(wrapper.getState());
-                if (!unmodified) {
-                    lastState.set(new StateWrapper(newState));
-                }
-                return !unmodified;
-            } finally {
-                resourceLock.unlock();
-            }
-        } else {
-            return false;
-        }
-    }
-
-    private static class StateWrapper {
-
-        private final Object state;
-        private final long timestamp;
-
-        public StateWrapper(final Object state) {
-            this.state = state;
-            this.timestamp = System.currentTimeMillis();
-        }
-
-        public Object getState() {
-            return state;
-        }
-
-        public long getTimestamp() {
-            return timestamp;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java
deleted file mode 100644
index 20ed1dd..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.file.monitor;
-
-import java.io.IOException;
-import java.nio.file.Path;
-
-public interface UpdateMonitor {
-
-    Object getCurrentState(Path path) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java
deleted file mode 100644
index 59b444a..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.search;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Set;
-
-import org.apache.nifi.util.search.ahocorasick.SearchState;
-
-/**
- * Defines an interface to search for content given a set of search terms. Any
- * implementation of search must be thread safe.
- *
- * @author
- * @param <T>
- */
-public interface Search<T> {
-
-    /**
-     * Establishes the dictionary of terms which will be searched in subsequent
-     * search calls. This can be called only once
-     *
-     * @param terms
-     */
-    void initializeDictionary(Set<SearchTerm<T>> terms);
-
-    /**
-     * Searches the given input stream for matches between the already specified
-     * dictionary and the contents scanned.
-     *
-     * @param haystack
-     * @param findAll if true will find all matches if false will find only the
-     * first match
-     * @return SearchState containing results Map might be empty which indicates
-     * no matches found but will not be null
-     * @throws IOException Thrown for any exceptions occurring while searching.
-     * @throws IllegalStateException if the dictionary has not yet been
-     * initialized
-     */
-    SearchState<T> search(InputStream haystack, boolean findAll) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
deleted file mode 100644
index 62de964..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.search;
-
-import java.nio.charset.Charset;
-import java.util.Arrays;
-
-/**
- * This is an immutable thread safe object representing a search term
- *
- * @author
- * @param <T>
- */
-public class SearchTerm<T> {
-
-    private final byte[] bytes;
-    private final int hashCode;
-    private final T reference;
-
-    /**
-     * Constructs a SearchTerm. Defensively copies the given byte array
-     *
-     * @param bytes
-     * @throws IllegalArgument exception if given bytes are null or 0 length
-     */
-    public SearchTerm(final byte[] bytes) {
-        this(bytes, true, null);
-    }
-
-    /**
-     * Constructs a search term. Optionally performs a defensive copy of the
-     * given byte array. If the caller indicates a defensive copy is not
-     * necessary then they must not change the given arrays state any longer
-     *
-     * @param bytes
-     * @param defensiveCopy
-     * @param reference
-     */
-    public SearchTerm(final byte[] bytes, final boolean defensiveCopy, final T reference) {
-        if (bytes == null || bytes.length == 0) {
-            throw new IllegalArgumentException();
-        }
-        if (defensiveCopy) {
-            this.bytes = Arrays.copyOf(bytes, bytes.length);
-        } else {
-            this.bytes = bytes;
-        }
-        this.hashCode = Arrays.hashCode(this.bytes);
-        this.reference = reference;
-    }
-
-    public int get(final int index) {
-        return bytes[index] & 0xff;
-    }
-
-    /**
-     * @return size in of search term in bytes
-     */
-    public int size() {
-        return bytes.length;
-    }
-
-    /**
-     * @return reference object for this given search term
-     */
-    public T getReference() {
-        return reference;
-    }
-
-    /**
-     * Determines if the given window starts with the same bytes as this term
-     *
-     * @param window Current window of bytes from the haystack being evaluated.
-     * @param windowLength The length of the window to consider
-     * @return true if this term starts with the same bytes of the given window
-     */
-    public boolean startsWith(byte[] window, int windowLength) {
-        if (windowLength > window.length) {
-            throw new IndexOutOfBoundsException();
-        }
-        if (bytes.length < windowLength) {
-            return false;
-        }
-        for (int i = 0; i < bytes.length && i < windowLength; i++) {
-            if (bytes[i] != window[i]) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * @return a defensive copy of the internal byte structure
-     */
-    public byte[] getBytes() {
-        return Arrays.copyOf(bytes, bytes.length);
-    }
-
-    @Override
-    public int hashCode() {
-        return hashCode;
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (obj == null) {
-            return false;
-        }
-        if (getClass() != obj.getClass()) {
-            return false;
-        }
-        final SearchTerm other = (SearchTerm) obj;
-        if (this.hashCode != other.hashCode) {
-            return false;
-        }
-        return Arrays.equals(this.bytes, other.bytes);
-    }
-
-    @Override
-    public String toString() {
-        return new String(bytes);
-    }
-
-    public String toString(final Charset charset) {
-        return new String(bytes, charset);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
deleted file mode 100644
index 3b8afaf..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.search.ahocorasick;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
-
-import org.apache.nifi.util.search.Search;
-import org.apache.nifi.util.search.SearchTerm;
-
-public class AhoCorasick<T> implements Search<T> {
-
-    private Node root = null;
-
-    /**
-     * Constructs a new search object.
-     *
-     * @throws IllegalArgumentException if given terms are null or empty
-     */
-    public AhoCorasick() {
-    }
-
-    @Override
-    public void initializeDictionary(final Set<SearchTerm<T>> terms) {
-        if (root != null) {
-            throw new IllegalStateException();
-        }
-        root = new Node();
-        if (terms == null || terms.isEmpty()) {
-            throw new IllegalArgumentException();
-        }
-        for (final SearchTerm<T> term : terms) {
-            int i = 0;
-            Node nextNode = root;
-            while (true) {
-                nextNode = addMatch(term, i, nextNode);
-                if (nextNode == null) {
-                    break; //we're done
-                }
-                i++;
-            }
-        }
-        initialize();
-    }
-
-    private Node addMatch(final SearchTerm<T> term, final int offset, final Node current) {
-        final int index = term.get(offset);
-        boolean atEnd = (offset == (term.size() - 1));
-        if (current.getNeighbor(index) == null) {
-            if (atEnd) {
-                current.setNeighbor(new Node(term), index);
-                return null;
-            }
-            current.setNeighbor(new Node(), index);
-        } else if (atEnd) {
-            current.getNeighbor(index).setMatchingTerm(term);
-            return null;
-        }
-        return current.getNeighbor(index);
-    }
-
-    private void initialize() {
-        //perform bgs to build failure links
-        final Queue<Node> queue = new LinkedList<>();
-        queue.add(root);
-        root.setFailureNode(null);
-        while (!queue.isEmpty()) {
-            final Node current = queue.poll();
-            for (int i = 0; i < 256; i++) {
-                final Node next = current.getNeighbor(i);
-                if (next != null) {
-                    //traverse failure to get state
-                    Node fail = current.getFailureNode();
-                    while ((fail != null) && fail.getNeighbor(i) == null) {
-                        fail = fail.getFailureNode();
-                    }
-                    if (fail != null) {
-                        next.setFailureNode(fail.getNeighbor(i));
-                    } else {
-                        next.setFailureNode(root);
-                    }
-                    queue.add(next);
-                }
-            }
-        }
-    }
-
-    @Override
-    public SearchState search(final InputStream stream, final boolean findAll) throws IOException {
-        return search(stream, findAll, null);
-    }
-
-    private SearchState search(final InputStream stream, final boolean findAll, final SearchState state) throws IOException {
-        if (root == null) {
-            throw new IllegalStateException();
-        }
-        final SearchState<T> currentState = (state == null) ? new SearchState(root) : state;
-        if (!findAll && currentState.foundMatch()) {
-            throw new IllegalStateException("A match has already been found yet we're being asked to keep searching");
-        }
-        Node current = currentState.getCurrentNode();
-        int currentChar;
-        while ((currentChar = stream.read()) >= 0) {
-            currentState.incrementBytesRead(1L);
-            Node next = current.getNeighbor(currentChar);
-            if (next == null) {
-                next = current.getFailureNode();
-                while ((next != null) && next.getNeighbor(currentChar) == null) {
-                    next = next.getFailureNode();
-                }
-                if (next != null) {
-                    next = next.getNeighbor(currentChar);
-                } else {
-                    next = root;
-                }
-            }
-            if (next == null) {
-                throw new IllegalStateException("tree out of sync");
-            }
-            //Accept condition
-            if (next.hasMatch()) {
-                currentState.addResult(next.getMatchingTerm());
-            }
-            for (Node failNode = next.getFailureNode(); failNode != null; failNode = failNode.getFailureNode()) {
-                if (failNode.hasMatch()) {
-                    currentState.addResult(failNode.getMatchingTerm());
-                }
-            }
-            current = next;
-            if (currentState.foundMatch() && !findAll) {
-                break;//give up as soon as we have at least one match
-            }
-        }
-        currentState.setCurrentNode(current);
-        return currentState;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
deleted file mode 100644
index 0ac325c..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.search.ahocorasick;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.nifi.util.search.SearchTerm;
-
-/**
- *
- * @author
- */
-public class Node {
-
-    private final Map<Integer, Node> neighborMap;
-    private Node failureNode;
-    private SearchTerm<?> term;
-
-    Node(final SearchTerm<?> term) {
-        this();
-        this.term = term;
-    }
-
-    Node() {
-        neighborMap = new HashMap<>();
-        term = null;
-    }
-
-    void setFailureNode(final Node fail) {
-        failureNode = fail;
-    }
-
-    public Node getFailureNode() {
-        return failureNode;
-    }
-
-    public boolean hasMatch() {
-        return term != null;
-    }
-
-    void setMatchingTerm(final SearchTerm<?> term) {
-        this.term = term;
-    }
-
-    public SearchTerm<?> getMatchingTerm() {
-        return term;
-    }
-
-    public Node getNeighbor(final int index) {
-        return neighborMap.get(index);
-    }
-
-    void setNeighbor(final Node neighbor, final int index) {
-        neighborMap.put(index, neighbor);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
deleted file mode 100644
index 6d36ad0..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.search.ahocorasick;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.nifi.util.search.SearchTerm;
-
-public class SearchState<T> {
-
-    private Node currentNode;
-    private final Map<SearchTerm<T>, List<Long>> resultMap;
-    private long bytesRead;
-
-    SearchState(final Node rootNode) {
-        resultMap = new HashMap<>(5);
-        currentNode = rootNode;
-        bytesRead = 0L;
-    }
-
-    void incrementBytesRead(final long increment) {
-        bytesRead += increment;
-    }
-
-    void setCurrentNode(final Node curr) {
-        currentNode = curr;
-    }
-
-    public Node getCurrentNode() {
-        return currentNode;
-    }
-
-    public Map<SearchTerm<T>, List<Long>> getResults() {
-        return new HashMap<>(resultMap);
-    }
-
-    void addResult(final SearchTerm matchingTerm) {
-        final List<Long> indexes = (resultMap.containsKey(matchingTerm)) ? resultMap.get(matchingTerm) : new ArrayList<Long>(5);
-        indexes.add(bytesRead);
-        resultMap.put(matchingTerm, indexes);
-    }
-
-    public boolean foundMatch() {
-        return !resultMap.isEmpty();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java
deleted file mode 100644
index 2b95897..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.timebuffer;
-
-public interface EntityAccess<T> {
-
-    T aggregate(T oldValue, T toAdd);
-
-    T createNew();
-
-    long getTimestamp(T entity);
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
deleted file mode 100644
index 193abc6..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.timebuffer;
-
-public class LongEntityAccess implements EntityAccess<TimestampedLong> {
-
-    @Override
-    public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) {
-        if (oldValue == null && toAdd == null) {
-            return new TimestampedLong(0L);
-        } else if (oldValue == null) {
-            return toAdd;
-        } else if (toAdd == null) {
-            return oldValue;
-        }
-
-        return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
-    }
-
-    @Override
-    public TimestampedLong createNew() {
-        return new TimestampedLong(0L);
-    }
-
-    @Override
-    public long getTimestamp(TimestampedLong entity) {
-        return entity == null ? 0L : entity.getTimestamp();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
deleted file mode 100644
index dd8e523..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.timebuffer;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class TimedBuffer<T> {
-
-    private final int numBins;
-    private final EntitySum<T>[] bins;
-    private final EntityAccess<T> entityAccess;
-    private final TimeUnit binPrecision;
-
-    @SuppressWarnings("unchecked")
-    public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess<T> accessor) {
-        this.binPrecision = binPrecision;
-        this.numBins = numBins + 1;
-        this.bins = new EntitySum[this.numBins];
-        for (int i = 0; i < this.numBins; i++) {
-            this.bins[i] = new EntitySum<>(binPrecision, numBins, accessor);
-        }
-        this.entityAccess = accessor;
-    }
-
-    public T add(final T entity) {
-        final int binIdx = (int) (binPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) % numBins);
-        final EntitySum<T> sum = bins[binIdx];
-
-        return sum.addOrReset(entity);
-    }
-
-    public T getAggregateValue(final long sinceEpochMillis) {
-        final int startBinIdx = (int) (binPrecision.convert(sinceEpochMillis, TimeUnit.MILLISECONDS) % numBins);
-
-        T total = null;
-        for (int i = 0; i < numBins; i++) {
-            int binIdx = (startBinIdx + i) % numBins;
-            final EntitySum<T> bin = bins[binIdx];
-
-            if (!bin.isExpired()) {
-                total = entityAccess.aggregate(total, bin.getValue());
-            }
-        }
-
-        return total;
-    }
-
-    private static class EntitySum<S> {
-
-        private final EntityAccess<S> entityAccess;
-        private final AtomicReference<S> ref = new AtomicReference<>();
-        private final TimeUnit binPrecision;
-        private final int numConfiguredBins;
-
-        public EntitySum(final TimeUnit binPrecision, final int numConfiguredBins, final EntityAccess<S> aggregator) {
-            this.binPrecision = binPrecision;
-            this.entityAccess = aggregator;
-            this.numConfiguredBins = numConfiguredBins;
-        }
-
-        private S add(final S event) {
-            S newValue;
-            S value;
-            do {
-                value = ref.get();
-                newValue = entityAccess.aggregate(value, event);
-            } while (!ref.compareAndSet(value, newValue));
-
-            return newValue;
-        }
-
-        public S getValue() {
-            return ref.get();
-        }
-
-        public boolean isExpired() {
-            // entityAccess.getTimestamp(curValue) represents the time at which the current value
-            // was last updated. If the last value is less than current time - 1 binPrecision, then it
-            // means that we've rolled over and need to reset the value.
-            final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(numConfiguredBins, binPrecision);
-
-            final S curValue = ref.get();
-            return (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod);
-        }
-
-        public S addOrReset(final S event) {
-            // entityAccess.getTimestamp(curValue) represents the time at which the current value
-            // was last updated. If the last value is less than current time - 1 binPrecision, then it
-            // means that we've rolled over and need to reset the value.
-            final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(1, binPrecision);
-
-            final S curValue = ref.get();
-            if (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod) {
-                ref.compareAndSet(curValue, entityAccess.createNew());
-            }
-            return add(event);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
deleted file mode 100644
index 07d31ea..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.timebuffer;
-
-public class TimestampedLong {
-
-    private final Long value;
-    private final long timestamp = System.currentTimeMillis();
-
-    public TimestampedLong(final Long value) {
-        this.value = value;
-    }
-
-    public Long getValue() {
-        return value;
-    }
-
-    public long getTimestamp() {
-        return timestamp;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java b/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
deleted file mode 100644
index bd30a96..0000000
--- a/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-
-import org.apache.nifi.remote.io.CompressionInputStream;
-import org.apache.nifi.remote.io.CompressionOutputStream;
-
-import org.junit.Test;
-
-public class TestCompressionInputOutputStreams {
-
-    @Test
-    public void testSimple() throws IOException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-        final byte[] data = "Hello, World!".getBytes("UTF-8");
-
-        final CompressionOutputStream cos = new CompressionOutputStream(baos);
-        cos.write(data);
-        cos.flush();
-        cos.close();
-
-        final byte[] compressedBytes = baos.toByteArray();
-        final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes));
-        final byte[] decompressed = readFully(cis);
-
-        assertTrue(Arrays.equals(data, decompressed));
-    }
-
-    @Test
-    public void testDataLargerThanBuffer() throws IOException {
-        final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r";
-
-        final StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < 100; i++) {
-            sb.append(str);
-        }
-        final byte[] data = sb.toString().getBytes("UTF-8");
-
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-        final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192);
-        cos.write(data);
-        cos.flush();
-        cos.close();
-
-        final byte[] compressedBytes = baos.toByteArray();
-        final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes));
-        final byte[] decompressed = readFully(cis);
-
-        assertTrue(Arrays.equals(data, decompressed));
-    }
-
-    @Test
-    public void testDataLargerThanBufferWhileFlushing() throws IOException {
-        final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r";
-        final byte[] data = str.getBytes("UTF-8");
-
-        final StringBuilder sb = new StringBuilder();
-        final byte[] data1024;
-
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-        final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192);
-        for (int i = 0; i < 1024; i++) {
-            cos.write(data);
-            cos.flush();
-            sb.append(str);
-        }
-        cos.close();
-        data1024 = sb.toString().getBytes("UTF-8");
-
-        final byte[] compressedBytes = baos.toByteArray();
-        final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes));
-        final byte[] decompressed = readFully(cis);
-
-        assertTrue(Arrays.equals(data1024, decompressed));
-    }
-
-    @Test
-    public void testSendingMultipleFilesBackToBackOnSameStream() throws IOException {
-        final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r";
-        final byte[] data = str.getBytes("UTF-8");
-
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-        final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192);
-        for (int i = 0; i < 512; i++) {
-            cos.write(data);
-            cos.flush();
-        }
-        cos.close();
-
-        final CompressionOutputStream cos2 = new CompressionOutputStream(baos, 8192);
-        for (int i = 0; i < 512; i++) {
-            cos2.write(data);
-            cos2.flush();
-        }
-        cos2.close();
-
-        final byte[] data512;
-        final StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < 512; i++) {
-            sb.append(str);
-        }
-        data512 = sb.toString().getBytes("UTF-8");
-
-        final byte[] compressedBytes = baos.toByteArray();
-        final ByteArrayInputStream bais = new ByteArrayInputStream(compressedBytes);
-
-        final CompressionInputStream cis = new CompressionInputStream(bais);
-        final byte[] decompressed = readFully(cis);
-        assertTrue(Arrays.equals(data512, decompressed));
-
-        final CompressionInputStream cis2 = new CompressionInputStream(bais);
-        final byte[] decompressed2 = readFully(cis2);
-        assertTrue(Arrays.equals(data512, decompressed2));
-    }
-
-    private byte[] readFully(final InputStream in) throws IOException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-        final byte[] buffer = new byte[65536];
-        int len;
-        while ((len = in.read(buffer)) >= 0) {
-            baos.write(buffer, 0, len);
-        }
-
-        return baos.toByteArray();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java b/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
deleted file mode 100644
index 52bd8de..0000000
--- a/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import org.apache.nifi.stream.io.ByteArrayInputStream;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
-import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-@Ignore("Tests are time-based")
-public class TestLeakyBucketThrottler {
-
-    @Test(timeout = 10000)
-    public void testOutputStreamInterface() throws IOException {
-        // throttle rate at 1 MB/sec
-        final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024);
-
-        final byte[] data = new byte[1024 * 1024 * 4];
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        final OutputStream throttledOut = throttler.newThrottledOutputStream(baos);
-
-        final long start = System.currentTimeMillis();
-        throttledOut.write(data);
-        throttler.close();
-        final long millis = System.currentTimeMillis() - start;
-        // should take 4 sec give or take
-        assertTrue(millis > 3000);
-        assertTrue(millis < 6000);
-    }
-
-    @Test(timeout = 10000)
-    public void testInputStreamInterface() throws IOException {
-        // throttle rate at 1 MB/sec
-        final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024);
-
-        final byte[] data = new byte[1024 * 1024 * 4];
-        final ByteArrayInputStream bais = new ByteArrayInputStream(data);
-        final InputStream throttledIn = throttler.newThrottledInputStream(bais);
-
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-        final byte[] buffer = new byte[4096];
-        final long start = System.currentTimeMillis();
-        int len;
-        while ((len = throttledIn.read(buffer)) > 0) {
-            baos.write(buffer, 0, len);
-        }
-        throttler.close();
-        final long millis = System.currentTimeMillis() - start;
-        // should take 4 sec give or take
-        assertTrue(millis > 3000);
-        assertTrue(millis < 6000);
-        baos.close();
-    }
-
-    @Test(timeout = 10000)
-    public void testDirectInterface() throws IOException, InterruptedException {
-        // throttle rate at 1 MB/sec
-        final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024);
-
-        // create 3 threads, each sending ~2 MB
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        final List<Thread> threads = new ArrayList<Thread>();
-        for (int i = 0; i < 3; i++) {
-            final Thread t = new WriterThread(i, throttler, baos);
-            threads.add(t);
-        }
-
-        final long start = System.currentTimeMillis();
-        for (final Thread t : threads) {
-            t.start();
-        }
-
-        for (final Thread t : threads) {
-            t.join();
-        }
-        final long elapsed = System.currentTimeMillis() - start;
-
-        throttler.close();
-
-        // To send 15 MB, it should have taken at least 5 seconds and no more than 7 seconds, to
-        // allow for busy-ness and the fact that we could write a tiny bit more than the limit.
-        assertTrue(elapsed > 5000);
-        assertTrue(elapsed < 7000);
-
-        // ensure bytes were copied out appropriately
-        assertEquals(3 * (2 * 1024 * 1024 + 1), baos.getBufferLength());
-        assertEquals((byte) 'A', baos.getUnderlyingBuffer()[baos.getBufferLength() - 1]);
-    }
-
-    private static class WriterThread extends Thread {
-
-        private final int idx;
-        private final byte[] data = new byte[1024 * 1024 * 2 + 1];
-        private final LeakyBucketStreamThrottler throttler;
-        private final OutputStream out;
-
-        public WriterThread(final int idx, final LeakyBucketStreamThrottler throttler, final OutputStream out) {
-            this.idx = idx;
-            this.throttler = throttler;
-            this.out = out;
-            this.data[this.data.length - 1] = (byte) 'A';
-        }
-
-        @Override
-        public void run() {
-            long startMillis = System.currentTimeMillis();
-            long bytesWritten = 0L;
-            try {
-                throttler.copy(new ByteArrayInputStream(data), out);
-            } catch (IOException e) {
-                e.printStackTrace();
-                return;
-            }
-            long now = System.currentTimeMillis();
-            long millisElapsed = now - startMillis;
-            bytesWritten += data.length;
-            float bytesPerSec = (float) bytesWritten / (float) millisElapsed * 1000F;
-            System.out.println(idx + " : copied data at a rate of " + bytesPerSec + " bytes/sec");
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java b/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java
deleted file mode 100644
index 0838e96..0000000
--- a/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-
-public class TestNaiveSearchRingBuffer {
-
-    @Test
-    public void testAddAndCompare() {
-        final byte[] pattern = new byte[]{
-            '\r', '0', 38, 48
-        };
-
-        final byte[] search = new byte[]{
-            '\r', '0', 38, 58, 58, 83, 78, '\r', '0', 38, 48, 83, 92, 78, 4, 38
-        };
-
-        final NaiveSearchRingBuffer circ = new NaiveSearchRingBuffer(pattern);
-        int counter = -1;
-        for (final byte b : search) {
-            counter++;
-            final boolean matched = circ.addAndCompare(b);
-            if (counter == 10) {
-                assertTrue(matched);
-            } else {
-                assertFalse(matched);
-            }
-        }
-    }
-
-    @Test
-    public void testGetOldestByte() {
-        final byte[] pattern = new byte[]{
-            '\r', '0', 38, 48
-        };
-
-        final byte[] search = new byte[]{
-            '\r', '0', 38, 58, 58, 83, 78, (byte) 223, (byte) 227, (byte) 250, '\r', '0', 38, 48, 83, 92, 78, 4, 38
-        };
-
-        final NaiveSearchRingBuffer circ = new NaiveSearchRingBuffer(pattern);
-        int counter = -1;
-        for (final byte b : search) {
-            counter++;
-            final boolean matched = circ.addAndCompare(b);
-            if (counter == 13) {
-                assertTrue(matched);
-            } else {
-                assertFalse(matched);
-            }
-        }
-    }
-
-}