You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/19 19:16:11 UTC
[37/59] [abbrv] [partial] incubator-nifi git commit: Reworked overall
directory structure to make releasing nifi vs maven plugis easier
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
deleted file mode 100644
index 41a0557..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
+++ /dev/null
@@ -1,623 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.file;
-
-import java.io.BufferedInputStream;
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
-
-import org.slf4j.Logger;
-
-/**
- * A utility class containing a few useful static methods to do typical IO
- * operations.
- *
- * @author unattributed
- */
-public class FileUtils {
-
- public static final long TRANSFER_CHUNK_SIZE_BYTES = 1024 * 1024 * 8; //8 MB chunks
- public static final long MILLIS_BETWEEN_ATTEMPTS = 50L;
-
- /**
- * Closes the given closeable quietly - no logging, no exceptions...
- *
- * @param closeable
- */
- public static void closeQuietly(final Closeable closeable) {
- if (null != closeable) {
- try {
- closeable.close();
- } catch (final IOException io) {/*IGNORE*/
-
- }
- }
- }
-
- /**
- * Releases the given lock quietly - no logging, no exception
- *
- * @param lock
- */
- public static void releaseQuietly(final FileLock lock) {
- if (null != lock) {
- try {
- lock.release();
- } catch (final IOException io) {
- /*IGNORE*/
- }
- }
- }
-
- public static void ensureDirectoryExistAndCanAccess(final File dir) throws IOException {
- if (dir.exists() && !dir.isDirectory()) {
- throw new IOException(dir.getAbsolutePath() + " is not a directory");
- } else if (!dir.exists()) {
- final boolean made = dir.mkdirs();
- if (!made) {
- throw new IOException(dir.getAbsolutePath() + " could not be created");
- }
- }
- if (!(dir.canRead() && dir.canWrite())) {
- throw new IOException(dir.getAbsolutePath() + " directory does not have read/write privilege");
- }
- }
-
- /**
- * Deletes the given file. If the given file exists but could not be deleted
- * this will be printed as a warning to the given logger
- *
- * @param file
- * @param logger
- * @return
- */
- public static boolean deleteFile(final File file, final Logger logger) {
- return FileUtils.deleteFile(file, logger, 1);
- }
-
- /**
- * Deletes the given file. If the given file exists but could not be deleted
- * this will be printed as a warning to the given logger
- *
- * @param file
- * @param logger
- * @param attempts indicates how many times an attempt to delete should be
- * made
- * @return true if given file no longer exists
- */
- public static boolean deleteFile(final File file, final Logger logger, final int attempts) {
- if (file == null) {
- return false;
- }
- boolean isGone = false;
- try {
- if (file.exists()) {
- final int effectiveAttempts = Math.max(1, attempts);
- for (int i = 0; i < effectiveAttempts && !isGone; i++) {
- isGone = file.delete() || !file.exists();
- if (!isGone && (effectiveAttempts - i) > 1) {
- FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
- }
- }
- if (!isGone && logger != null) {
- logger.warn("File appears to exist but unable to delete file: " + file.getAbsolutePath());
- }
- }
- } catch (final Throwable t) {
- if (logger != null) {
- logger.warn("Unable to delete file: '" + file.getAbsolutePath() + "' due to " + t);
- }
- }
- return isGone;
- }
-
- /**
- * Deletes all of the given files. If any exist and cannot be deleted that
- * will be printed at warn to the given logger.
- *
- * @param files can be null
- * @param logger can be null
- */
- public static void deleteFile(final List<File> files, final Logger logger) {
- FileUtils.deleteFile(files, logger, 1);
- }
-
- /**
- * Deletes all of the given files. If any exist and cannot be deleted that
- * will be printed at warn to the given logger.
- *
- * @param files can be null
- * @param logger can be null
- * @param attempts indicates how many times an attempt should be made to
- * delete each file
- */
- public static void deleteFile(final List<File> files, final Logger logger, final int attempts) {
- if (null == files || files.isEmpty()) {
- return;
- }
- final int effectiveAttempts = Math.max(1, attempts);
- for (final File file : files) {
- try {
- boolean isGone = false;
- for (int i = 0; i < effectiveAttempts && !isGone; i++) {
- isGone = file.delete() || !file.exists();
- if (!isGone && (effectiveAttempts - i) > 1) {
- FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
- }
- }
- if (!isGone && logger != null) {
- logger.warn("File appears to exist but unable to delete file: " + file.getAbsolutePath());
- }
- } catch (final Throwable t) {
- if (null != logger) {
- logger.warn("Unable to delete file given from path: '" + file.getPath() + "' due to " + t);
- }
- }
- }
- }
-
- /**
- * Deletes all files (not directories..) in the given directory (non
- * recursive) that match the given filename filter. If any file cannot be
- * deleted then this is printed at warn to the given logger.
- *
- * @param directory
- * @param filter if null then no filter is used
- * @param logger
- */
- public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger) {
- FileUtils.deleteFilesInDir(directory, filter, logger, false);
- }
-
- /**
- * Deletes all files (not directories) in the given directory (recursive)
- * that match the given filename filter. If any file cannot be deleted then
- * this is printed at warn to the given logger.
- *
- * @param directory
- * @param filter if null then no filter is used
- * @param logger
- * @param recurse
- */
- public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse) {
- FileUtils.deleteFilesInDir(directory, filter, logger, recurse, false);
- }
-
- /**
- * Deletes all files (not directories) in the given directory (recursive)
- * that match the given filename filter. If any file cannot be deleted then
- * this is printed at warn to the given logger.
- *
- * @param directory
- * @param filter if null then no filter is used
- * @param logger
- * @param recurse
- * @param deleteEmptyDirectories default is false; if true will delete
- * directories found that are empty
- */
- public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse, final boolean deleteEmptyDirectories) {
- // ensure the specified directory is actually a directory and that it exists
- if (null != directory && directory.isDirectory()) {
- final File ingestFiles[] = directory.listFiles();
- for (File ingestFile : ingestFiles) {
- boolean process = (filter == null) ? true : filter.accept(directory, ingestFile.getName());
- if (ingestFile.isFile() && process) {
- FileUtils.deleteFile(ingestFile, logger, 3);
- }
- if (ingestFile.isDirectory() && recurse) {
- FileUtils.deleteFilesInDir(ingestFile, filter, logger, recurse, deleteEmptyDirectories);
- if (deleteEmptyDirectories && ingestFile.list().length == 0) {
- FileUtils.deleteFile(ingestFile, logger, 3);
- }
- }
- }
- }
- }
-
- /**
- * Deletes given files.
- *
- * @param files
- * @param recurse will recurse
- * @throws IOException
- */
- public static void deleteFiles(final Collection<File> files, final boolean recurse) throws IOException {
- for (final File file : files) {
- FileUtils.deleteFile(file, recurse);
- }
- }
-
- public static void deleteFile(final File file, final boolean recurse) throws IOException {
- if (file.isDirectory() && recurse) {
- FileUtils.deleteFiles(Arrays.asList(file.listFiles()), recurse);
- }
- //now delete the file itself regardless of whether it is plain file or a directory
- if (!FileUtils.deleteFile(file, null, 5)) {
- throw new IOException("Unable to delete " + file.getAbsolutePath());
- }
- }
-
- /**
- * Randomly generates a sequence of bytes and overwrites the contents of the
- * file a number of times. The file is then deleted.
- *
- * @param file File to be overwritten a number of times and, ultimately,
- * deleted
- * @param passes Number of times file should be overwritten
- * @throws IOException if something makes shredding or deleting a problem
- */
- public static void shredFile(final File file, final int passes)
- throws IOException {
- final Random generator = new Random();
- final long fileLength = file.length();
- final int byteArraySize = (int) Math.min(fileLength, 1048576); // 1MB
- final byte[] b = new byte[byteArraySize];
- final long numOfRandomWrites = (fileLength / b.length) + 1;
- final FileOutputStream fos = new FileOutputStream(file);
- try {
- // Over write file contents (passes) times
- final FileChannel channel = fos.getChannel();
- for (int i = 0; i < passes; i++) {
- generator.nextBytes(b);
- for (int j = 0; j <= numOfRandomWrites; j++) {
- fos.write(b);
- }
- fos.flush();
- channel.position(0);
- }
- // Write out "0" for each byte in the file
- Arrays.fill(b, (byte) 0);
- for (int j = 0; j < numOfRandomWrites; j++) {
- fos.write(b);
- }
- fos.flush();
- fos.close();
- // Try to delete the file a few times
- if (!FileUtils.deleteFile(file, null, 5)) {
- throw new IOException("Failed to delete file after shredding");
- }
-
- } finally {
- FileUtils.closeQuietly(fos);
- }
- }
-
- public static long copy(final InputStream in, final OutputStream out) throws IOException {
- final byte[] buffer = new byte[65536];
- long copied = 0L;
- int len;
- while ((len = in.read(buffer)) > 0) {
- out.write(buffer, 0, len);
- copied += len;
- }
-
- return copied;
- }
-
- public static long copyBytes(final byte[] bytes, final File destination, final boolean lockOutputFile) throws FileNotFoundException, IOException {
- FileOutputStream fos = null;
- FileLock outLock = null;
- long fileSize = 0L;
- try {
- fos = new FileOutputStream(destination);
- final FileChannel out = fos.getChannel();
- if (lockOutputFile) {
- outLock = out.tryLock(0, Long.MAX_VALUE, false);
- if (null == outLock) {
- throw new IOException("Unable to obtain exclusive file lock for: " + destination.getAbsolutePath());
- }
- }
- fos.write(bytes);
- fos.flush();
- fileSize = bytes.length;
- } finally {
- FileUtils.releaseQuietly(outLock);
- FileUtils.closeQuietly(fos);
- }
- return fileSize;
- }
-
- /**
- * Copies the given source file to the given destination file. The given
- * destination will be overwritten if it already exists.
- *
- * @param source
- * @param destination
- * @param lockInputFile if true will lock input file during copy; if false
- * will not
- * @param lockOutputFile if true will lock output file during copy; if false
- * will not
- * @param move if true will perform what is effectively a move operation
- * rather than a pure copy. This allows for potentially highly efficient
- * movement of the file but if not possible this will revert to a copy then
- * delete behavior. If false, then the file is copied and the source file is
- * retained. If a true rename/move occurs then no lock is held during that
- * time.
- * @param logger if failures occur, they will be logged to this logger if
- * possible. If this logger is null, an IOException will instead be thrown,
- * indicating the problem.
- * @return long number of bytes copied
- * @throws FileNotFoundException if the source file could not be found
- * @throws IOException
- * @throws SecurityException if a security manager denies the needed file
- * operations
- */
- public static long copyFile(final File source, final File destination, final boolean lockInputFile, final boolean lockOutputFile, final boolean move, final Logger logger) throws FileNotFoundException, IOException {
-
- FileInputStream fis = null;
- FileOutputStream fos = null;
- FileLock inLock = null;
- FileLock outLock = null;
- long fileSize = 0L;
- if (!source.canRead()) {
- throw new IOException("Must at least have read permission");
-
- }
- if (move && source.renameTo(destination)) {
- fileSize = destination.length();
- } else {
- try {
- fis = new FileInputStream(source);
- fos = new FileOutputStream(destination);
- final FileChannel in = fis.getChannel();
- final FileChannel out = fos.getChannel();
- if (lockInputFile) {
- inLock = in.tryLock(0, Long.MAX_VALUE, true);
- if (null == inLock) {
- throw new IOException("Unable to obtain shared file lock for: " + source.getAbsolutePath());
- }
- }
- if (lockOutputFile) {
- outLock = out.tryLock(0, Long.MAX_VALUE, false);
- if (null == outLock) {
- throw new IOException("Unable to obtain exclusive file lock for: " + destination.getAbsolutePath());
- }
- }
- long bytesWritten = 0;
- do {
- bytesWritten += out.transferFrom(in, bytesWritten, TRANSFER_CHUNK_SIZE_BYTES);
- fileSize = in.size();
- } while (bytesWritten < fileSize);
- out.force(false);
- FileUtils.closeQuietly(fos);
- FileUtils.closeQuietly(fis);
- fos = null;
- fis = null;
- if (move && !FileUtils.deleteFile(source, null, 5)) {
- if (logger == null) {
- FileUtils.deleteFile(destination, null, 5);
- throw new IOException("Could not remove file " + source.getAbsolutePath());
- } else {
- logger.warn("Configured to delete source file when renaming/move not successful. However, unable to delete file at: " + source.getAbsolutePath());
- }
- }
- } finally {
- FileUtils.releaseQuietly(inLock);
- FileUtils.releaseQuietly(outLock);
- FileUtils.closeQuietly(fos);
- FileUtils.closeQuietly(fis);
- }
- }
- return fileSize;
- }
-
- /**
- * Copies the given source file to the given destination file. The given
- * destination will be overwritten if it already exists.
- *
- * @param source
- * @param destination
- * @param lockInputFile if true will lock input file during copy; if false
- * will not
- * @param lockOutputFile if true will lock output file during copy; if false
- * will not
- * @param logger
- * @return long number of bytes copied
- * @throws FileNotFoundException if the source file could not be found
- * @throws IOException
- * @throws SecurityException if a security manager denies the needed file
- * operations
- */
- public static long copyFile(final File source, final File destination, final boolean lockInputFile, final boolean lockOutputFile, final Logger logger) throws FileNotFoundException, IOException {
- return FileUtils.copyFile(source, destination, lockInputFile, lockOutputFile, false, logger);
- }
-
- public static long copyFile(final File source, final OutputStream stream, final boolean closeOutputStream, final boolean lockInputFile) throws FileNotFoundException, IOException {
- FileInputStream fis = null;
- FileLock inLock = null;
- long fileSize = 0L;
- try {
- fis = new FileInputStream(source);
- final FileChannel in = fis.getChannel();
- if (lockInputFile) {
- inLock = in.tryLock(0, Long.MAX_VALUE, true);
- if (inLock == null) {
- throw new IOException("Unable to obtain exclusive file lock for: " + source.getAbsolutePath());
- }
-
- }
-
- byte[] buffer = new byte[1 << 18]; //256 KB
- int bytesRead = -1;
- while ((bytesRead = fis.read(buffer)) != -1) {
- stream.write(buffer, 0, bytesRead);
- }
- in.force(false);
- stream.flush();
- fileSize = in.size();
- } finally {
- FileUtils.releaseQuietly(inLock);
- FileUtils.closeQuietly(fis);
- if (closeOutputStream) {
- FileUtils.closeQuietly(stream);
- }
- }
- return fileSize;
- }
-
- public static long copyFile(final InputStream stream, final File destination, final boolean closeInputStream, final boolean lockOutputFile) throws FileNotFoundException, IOException {
- final Path destPath = destination.toPath();
- final long size = Files.copy(stream, destPath);
- if (closeInputStream) {
- stream.close();
- }
- return size;
- }
-
- /**
- * Renames the given file from the source path to the destination path. This
- * handles multiple attempts. This should only be used to rename within a
- * given directory. Renaming across directories might not work well. See the
- * <code>File.renameTo</code> for more information.
- *
- * @param source the file to rename
- * @param destination the file path to rename to
- * @param maxAttempts the max number of attempts to attempt the rename
- * @throws IOException if rename isn't successful
- */
- public static void renameFile(final File source, final File destination, final int maxAttempts) throws IOException {
- FileUtils.renameFile(source, destination, maxAttempts, false);
- }
-
- /**
- * Renames the given file from the source path to the destination path. This
- * handles multiple attempts. This should only be used to rename within a
- * given directory. Renaming across directories might not work well. See the
- * <code>File.renameTo</code> for more information.
- *
- * @param source the file to rename
- * @param destination the file path to rename to
- * @param maxAttempts the max number of attempts to attempt the rename
- * @param replace if true and a rename attempt fails will check if a file is
- * already at the destination path. If so it will delete that file and
- * attempt the rename according the remaining maxAttempts. If false, any
- * conflicting files will be left as they were and the rename attempts will
- * fail if conflicting.
- * @throws IOException if rename isn't successful
- */
- public static void renameFile(final File source, final File destination, final int maxAttempts, final boolean replace) throws IOException {
- final int attempts = (replace || maxAttempts < 1) ? Math.max(2, maxAttempts) : maxAttempts;
- boolean renamed = false;
- for (int i = 0; i < attempts; i++) {
- renamed = source.renameTo(destination);
- if (!renamed) {
- FileUtils.deleteFile(destination, null, 5);
- } else {
- break; //rename has succeeded
- }
- }
- if (!renamed) {
- throw new IOException("Attempted " + maxAttempts + " times but unable to rename from \'" + source.getPath() + "\' to \'" + destination.getPath() + "\'");
-
- }
- }
-
- public static void sleepQuietly(final long millis) {
- try {
- Thread.sleep(millis);
- } catch (final InterruptedException ex) {
- /* do nothing */
- }
- }
-
- /**
- * Syncs a primary copy of a file with the copy in the restore directory. If
- * the restore directory does not have a file and the primary has a file,
- * the the primary's file is copied to the restore directory. Else if the
- * restore directory has a file, but the primary does not, then the
- * restore's file is copied to the primary directory. Else if the primary
- * file is different than the restore file, then an IllegalStateException is
- * thrown. Otherwise, if neither file exists, then no syncing is performed.
- *
- * @param primaryFile the primary file
- * @param restoreFile the restore file
- * @param logger a logger
- * @throws IOException if an I/O problem was encountered during syncing
- * @throws IllegalStateException if the primary and restore copies exist but
- * are different
- */
- public static void syncWithRestore(final File primaryFile, final File restoreFile, final Logger logger)
- throws IOException {
-
- if (primaryFile.exists() && !restoreFile.exists()) {
- // copy primary file to restore
- copyFile(primaryFile, restoreFile, false, false, logger);
- } else if (restoreFile.exists() && !primaryFile.exists()) {
- // copy restore file to primary
- copyFile(restoreFile, primaryFile, false, false, logger);
- } else if (primaryFile.exists() && restoreFile.exists() && !isSame(primaryFile, restoreFile)) {
- throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'",
- primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath()));
- }
- }
-
- /**
- * Returns true if the given files are the same according to their MD5 hash.
- *
- * @param file1 a file
- * @param file2 a file
- * @return true if the files are the same; false otherwise
- * @throws IOException if the MD5 hash could not be computed
- */
- public static boolean isSame(final File file1, final File file2) throws IOException {
- return Arrays.equals(computeMd5Digest(file1), computeMd5Digest(file2));
- }
-
- /**
- * Returns the MD5 hash of the given file.
- *
- * @param file a file
- * @return the MD5 hash
- * @throws IOException if the MD5 hash could not be computed
- */
- public static byte[] computeMd5Digest(final File file) throws IOException {
- final MessageDigest digest;
- try {
- digest = MessageDigest.getInstance("MD5");
- } catch (final NoSuchAlgorithmException nsae) {
- throw new IOException(nsae);
- }
-
- try (final FileInputStream fis = new FileInputStream(file)) {
- int len;
- final byte[] buffer = new byte[8192];
- while ((len = fis.read(buffer)) > -1) {
- if (len > 0) {
- digest.update(buffer, 0, len);
- }
- }
- }
- return digest.digest();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java
deleted file mode 100644
index 6f9c616..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.file.monitor;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * An {@link UpdateMonitor} that combines multiple <code>UpdateMonitor</code>s
- * such that it will indicate a change in a file only if ALL sub-monitors
- * indicate a change. The sub-monitors will be applied in the order given and if
- * any indicates that the state has not changed, the subsequent sub-monitors may
- * not be given a chance to run
- */
-public class CompoundUpdateMonitor implements UpdateMonitor {
-
- private final List<UpdateMonitor> monitors;
-
- public CompoundUpdateMonitor(final UpdateMonitor first, final UpdateMonitor... others) {
- monitors = new ArrayList<>();
- monitors.add(first);
- for (final UpdateMonitor monitor : others) {
- monitors.add(monitor);
- }
- }
-
- @Override
- public Object getCurrentState(final Path path) throws IOException {
- return new DeferredMonitorAction(monitors, path);
- }
-
- private static class DeferredMonitorAction {
-
- private static final Object NON_COMPUTED_VALUE = new Object();
-
- private final List<UpdateMonitor> monitors;
- private final Path path;
-
- private final Object[] preCalculated;
-
- public DeferredMonitorAction(final List<UpdateMonitor> monitors, final Path path) {
- this.monitors = monitors;
- this.path = path;
- preCalculated = new Object[monitors.size()];
-
- for (int i = 0; i < preCalculated.length; i++) {
- preCalculated[i] = NON_COMPUTED_VALUE;
- }
- }
-
- private Object getCalculatedValue(final int i) throws IOException {
- if (preCalculated[i] == NON_COMPUTED_VALUE) {
- preCalculated[i] = monitors.get(i).getCurrentState(path);
- }
-
- return preCalculated[i];
- }
-
- @Override
- public boolean equals(final Object obj) {
- // must return true unless ALL DeferredMonitorAction's indicate that they are different
- if (obj == null) {
- return false;
- }
-
- if (!(obj instanceof DeferredMonitorAction)) {
- return false;
- }
-
- final DeferredMonitorAction other = (DeferredMonitorAction) obj;
- try {
- // Go through each UpdateMonitor's value and check if the value has changed.
- for (int i = 0; i < preCalculated.length; i++) {
- final Object mine = getCalculatedValue(i);
- final Object theirs = other.getCalculatedValue(i);
-
- if (mine == theirs) {
- // same
- return true;
- }
-
- if (mine == null && theirs == null) {
- // same
- return true;
- }
-
- if (mine.equals(theirs)) {
- return true;
- }
- }
- } catch (final IOException e) {
- return false;
- }
-
- // No DeferredMonitorAction was the same as last time. Therefore, it's not equal
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java
deleted file mode 100644
index e6be558..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.file.monitor;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
-public class LastModifiedMonitor implements UpdateMonitor {
-
- @Override
- public Object getCurrentState(final Path path) throws IOException {
- return Files.getLastModifiedTime(path);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java
deleted file mode 100644
index 8dea4bf..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.file.monitor;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Path;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-
-public class MD5SumMonitor implements UpdateMonitor {
-
- @Override
- public Object getCurrentState(final Path path) throws IOException {
- final MessageDigest digest;
- try {
- digest = MessageDigest.getInstance("MD5");
- } catch (final NoSuchAlgorithmException nsae) {
- throw new AssertionError(nsae);
- }
-
- try (final FileInputStream fis = new FileInputStream(path.toFile())) {
- int len;
- final byte[] buffer = new byte[8192];
- while ((len = fis.read(buffer)) > -1) {
- if (len > 0) {
- digest.update(buffer, 0, len);
- }
- }
- }
-
- // Return a ByteBuffer instead of byte[] because we want equals() to do a deep equality
- return ByteBuffer.wrap(digest.digest());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java
deleted file mode 100644
index e0089c1..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.file.monitor;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Allows the user to configure a {@link java.nio.file.Path Path} to watch for
- * modifications and periodically poll to check if the file has been modified
- */
-public class SynchronousFileWatcher {
-
- private final Path path;
- private final long checkUpdateMillis;
- private final UpdateMonitor monitor;
- private final AtomicReference<StateWrapper> lastState;
- private final Lock resourceLock = new ReentrantLock();
-
- public SynchronousFileWatcher(final Path path, final UpdateMonitor monitor) {
- this(path, monitor, 0L);
- }
-
- public SynchronousFileWatcher(final Path path, final UpdateMonitor monitor, final long checkMillis) {
- if (checkMillis < 0) {
- throw new IllegalArgumentException();
- }
-
- this.path = path;
- checkUpdateMillis = checkMillis;
- this.monitor = monitor;
-
- Object currentState;
- try {
- currentState = monitor.getCurrentState(path);
- } catch (final IOException e) {
- currentState = null;
- }
-
- this.lastState = new AtomicReference<>(new StateWrapper(currentState));
- }
-
- /**
- * Checks if the file has been updated according to the configured
- * {@link UpdateMonitor} and resets the state
- *
- * @return
- * @throws IOException
- */
- public boolean checkAndReset() throws IOException {
- if (checkUpdateMillis <= 0) { // if checkUpdateMillis <= 0, always check
- return checkForUpdate();
- } else {
- final StateWrapper stateWrapper = lastState.get();
- if (stateWrapper.getTimestamp() < System.currentTimeMillis() - checkUpdateMillis) {
- return checkForUpdate();
- }
- return false;
- }
- }
-
- private boolean checkForUpdate() throws IOException {
- if (resourceLock.tryLock()) {
- try {
- final StateWrapper wrapper = lastState.get();
- final Object newState = monitor.getCurrentState(path);
- if (newState == null && wrapper.getState() == null) {
- return false;
- }
- if (newState == null || wrapper.getState() == null) {
- lastState.set(new StateWrapper(newState));
- return true;
- }
-
- final boolean unmodified = newState.equals(wrapper.getState());
- if (!unmodified) {
- lastState.set(new StateWrapper(newState));
- }
- return !unmodified;
- } finally {
- resourceLock.unlock();
- }
- } else {
- return false;
- }
- }
-
- private static class StateWrapper {
-
- private final Object state;
- private final long timestamp;
-
- public StateWrapper(final Object state) {
- this.state = state;
- this.timestamp = System.currentTimeMillis();
- }
-
- public Object getState() {
- return state;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java
deleted file mode 100644
index 20ed1dd..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.file.monitor;
-
-import java.io.IOException;
-import java.nio.file.Path;
-
-public interface UpdateMonitor {
-
- Object getCurrentState(Path path) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java
deleted file mode 100644
index 59b444a..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.search;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Set;
-
-import org.apache.nifi.util.search.ahocorasick.SearchState;
-
-/**
- * Defines an interface to search for content given a set of search terms. Any
- * implementation of search must be thread safe.
- *
- * @author
- * @param <T>
- */
-public interface Search<T> {
-
- /**
- * Establishes the dictionary of terms which will be searched in subsequent
- * search calls. This can be called only once
- *
- * @param terms
- */
- void initializeDictionary(Set<SearchTerm<T>> terms);
-
- /**
- * Searches the given input stream for matches between the already specified
- * dictionary and the contents scanned.
- *
- * @param haystack
- * @param findAll if true will find all matches if false will find only the
- * first match
- * @return SearchState containing results Map might be empty which indicates
- * no matches found but will not be null
- * @throws IOException Thrown for any exceptions occurring while searching.
- * @throws IllegalStateException if the dictionary has not yet been
- * initialized
- */
- SearchState<T> search(InputStream haystack, boolean findAll) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
deleted file mode 100644
index 62de964..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.search;
-
-import java.nio.charset.Charset;
-import java.util.Arrays;
-
-/**
- * This is an immutable thread safe object representing a search term
- *
- * @author
- * @param <T>
- */
-public class SearchTerm<T> {
-
- private final byte[] bytes;
- private final int hashCode;
- private final T reference;
-
- /**
- * Constructs a SearchTerm. Defensively copies the given byte array
- *
- * @param bytes
- * @throws IllegalArgument exception if given bytes are null or 0 length
- */
- public SearchTerm(final byte[] bytes) {
- this(bytes, true, null);
- }
-
- /**
- * Constructs a search term. Optionally performs a defensive copy of the
- * given byte array. If the caller indicates a defensive copy is not
- * necessary then they must not change the given arrays state any longer
- *
- * @param bytes
- * @param defensiveCopy
- * @param reference
- */
- public SearchTerm(final byte[] bytes, final boolean defensiveCopy, final T reference) {
- if (bytes == null || bytes.length == 0) {
- throw new IllegalArgumentException();
- }
- if (defensiveCopy) {
- this.bytes = Arrays.copyOf(bytes, bytes.length);
- } else {
- this.bytes = bytes;
- }
- this.hashCode = Arrays.hashCode(this.bytes);
- this.reference = reference;
- }
-
- public int get(final int index) {
- return bytes[index] & 0xff;
- }
-
- /**
- * @return size in of search term in bytes
- */
- public int size() {
- return bytes.length;
- }
-
- /**
- * @return reference object for this given search term
- */
- public T getReference() {
- return reference;
- }
-
- /**
- * Determines if the given window starts with the same bytes as this term
- *
- * @param window Current window of bytes from the haystack being evaluated.
- * @param windowLength The length of the window to consider
- * @return true if this term starts with the same bytes of the given window
- */
- public boolean startsWith(byte[] window, int windowLength) {
- if (windowLength > window.length) {
- throw new IndexOutOfBoundsException();
- }
- if (bytes.length < windowLength) {
- return false;
- }
- for (int i = 0; i < bytes.length && i < windowLength; i++) {
- if (bytes[i] != window[i]) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * @return a defensive copy of the internal byte structure
- */
- public byte[] getBytes() {
- return Arrays.copyOf(bytes, bytes.length);
- }
-
- @Override
- public int hashCode() {
- return hashCode;
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- final SearchTerm other = (SearchTerm) obj;
- if (this.hashCode != other.hashCode) {
- return false;
- }
- return Arrays.equals(this.bytes, other.bytes);
- }
-
- @Override
- public String toString() {
- return new String(bytes);
- }
-
- public String toString(final Charset charset) {
- return new String(bytes, charset);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
deleted file mode 100644
index 3b8afaf..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.search.ahocorasick;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
-
-import org.apache.nifi.util.search.Search;
-import org.apache.nifi.util.search.SearchTerm;
-
-public class AhoCorasick<T> implements Search<T> {
-
- private Node root = null;
-
- /**
- * Constructs a new search object.
- *
- * @throws IllegalArgumentException if given terms are null or empty
- */
- public AhoCorasick() {
- }
-
- @Override
- public void initializeDictionary(final Set<SearchTerm<T>> terms) {
- if (root != null) {
- throw new IllegalStateException();
- }
- root = new Node();
- if (terms == null || terms.isEmpty()) {
- throw new IllegalArgumentException();
- }
- for (final SearchTerm<T> term : terms) {
- int i = 0;
- Node nextNode = root;
- while (true) {
- nextNode = addMatch(term, i, nextNode);
- if (nextNode == null) {
- break; //we're done
- }
- i++;
- }
- }
- initialize();
- }
-
- private Node addMatch(final SearchTerm<T> term, final int offset, final Node current) {
- final int index = term.get(offset);
- boolean atEnd = (offset == (term.size() - 1));
- if (current.getNeighbor(index) == null) {
- if (atEnd) {
- current.setNeighbor(new Node(term), index);
- return null;
- }
- current.setNeighbor(new Node(), index);
- } else if (atEnd) {
- current.getNeighbor(index).setMatchingTerm(term);
- return null;
- }
- return current.getNeighbor(index);
- }
-
- private void initialize() {
- //perform bgs to build failure links
- final Queue<Node> queue = new LinkedList<>();
- queue.add(root);
- root.setFailureNode(null);
- while (!queue.isEmpty()) {
- final Node current = queue.poll();
- for (int i = 0; i < 256; i++) {
- final Node next = current.getNeighbor(i);
- if (next != null) {
- //traverse failure to get state
- Node fail = current.getFailureNode();
- while ((fail != null) && fail.getNeighbor(i) == null) {
- fail = fail.getFailureNode();
- }
- if (fail != null) {
- next.setFailureNode(fail.getNeighbor(i));
- } else {
- next.setFailureNode(root);
- }
- queue.add(next);
- }
- }
- }
- }
-
- @Override
- public SearchState search(final InputStream stream, final boolean findAll) throws IOException {
- return search(stream, findAll, null);
- }
-
- private SearchState search(final InputStream stream, final boolean findAll, final SearchState state) throws IOException {
- if (root == null) {
- throw new IllegalStateException();
- }
- final SearchState<T> currentState = (state == null) ? new SearchState(root) : state;
- if (!findAll && currentState.foundMatch()) {
- throw new IllegalStateException("A match has already been found yet we're being asked to keep searching");
- }
- Node current = currentState.getCurrentNode();
- int currentChar;
- while ((currentChar = stream.read()) >= 0) {
- currentState.incrementBytesRead(1L);
- Node next = current.getNeighbor(currentChar);
- if (next == null) {
- next = current.getFailureNode();
- while ((next != null) && next.getNeighbor(currentChar) == null) {
- next = next.getFailureNode();
- }
- if (next != null) {
- next = next.getNeighbor(currentChar);
- } else {
- next = root;
- }
- }
- if (next == null) {
- throw new IllegalStateException("tree out of sync");
- }
- //Accept condition
- if (next.hasMatch()) {
- currentState.addResult(next.getMatchingTerm());
- }
- for (Node failNode = next.getFailureNode(); failNode != null; failNode = failNode.getFailureNode()) {
- if (failNode.hasMatch()) {
- currentState.addResult(failNode.getMatchingTerm());
- }
- }
- current = next;
- if (currentState.foundMatch() && !findAll) {
- break;//give up as soon as we have at least one match
- }
- }
- currentState.setCurrentNode(current);
- return currentState;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
deleted file mode 100644
index 0ac325c..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.search.ahocorasick;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.nifi.util.search.SearchTerm;
-
-/**
- *
- * @author
- */
-public class Node {
-
- private final Map<Integer, Node> neighborMap;
- private Node failureNode;
- private SearchTerm<?> term;
-
- Node(final SearchTerm<?> term) {
- this();
- this.term = term;
- }
-
- Node() {
- neighborMap = new HashMap<>();
- term = null;
- }
-
- void setFailureNode(final Node fail) {
- failureNode = fail;
- }
-
- public Node getFailureNode() {
- return failureNode;
- }
-
- public boolean hasMatch() {
- return term != null;
- }
-
- void setMatchingTerm(final SearchTerm<?> term) {
- this.term = term;
- }
-
- public SearchTerm<?> getMatchingTerm() {
- return term;
- }
-
- public Node getNeighbor(final int index) {
- return neighborMap.get(index);
- }
-
- void setNeighbor(final Node neighbor, final int index) {
- neighborMap.put(index, neighbor);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
deleted file mode 100644
index 6d36ad0..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.search.ahocorasick;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.nifi.util.search.SearchTerm;
-
-public class SearchState<T> {
-
- private Node currentNode;
- private final Map<SearchTerm<T>, List<Long>> resultMap;
- private long bytesRead;
-
- SearchState(final Node rootNode) {
- resultMap = new HashMap<>(5);
- currentNode = rootNode;
- bytesRead = 0L;
- }
-
- void incrementBytesRead(final long increment) {
- bytesRead += increment;
- }
-
- void setCurrentNode(final Node curr) {
- currentNode = curr;
- }
-
- public Node getCurrentNode() {
- return currentNode;
- }
-
- public Map<SearchTerm<T>, List<Long>> getResults() {
- return new HashMap<>(resultMap);
- }
-
- void addResult(final SearchTerm matchingTerm) {
- final List<Long> indexes = (resultMap.containsKey(matchingTerm)) ? resultMap.get(matchingTerm) : new ArrayList<Long>(5);
- indexes.add(bytesRead);
- resultMap.put(matchingTerm, indexes);
- }
-
- public boolean foundMatch() {
- return !resultMap.isEmpty();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java
deleted file mode 100644
index 2b95897..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.timebuffer;
-
-public interface EntityAccess<T> {
-
- T aggregate(T oldValue, T toAdd);
-
- T createNew();
-
- long getTimestamp(T entity);
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
deleted file mode 100644
index 193abc6..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.timebuffer;
-
-public class LongEntityAccess implements EntityAccess<TimestampedLong> {
-
- @Override
- public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) {
- if (oldValue == null && toAdd == null) {
- return new TimestampedLong(0L);
- } else if (oldValue == null) {
- return toAdd;
- } else if (toAdd == null) {
- return oldValue;
- }
-
- return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
- }
-
- @Override
- public TimestampedLong createNew() {
- return new TimestampedLong(0L);
- }
-
- @Override
- public long getTimestamp(TimestampedLong entity) {
- return entity == null ? 0L : entity.getTimestamp();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
deleted file mode 100644
index dd8e523..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.timebuffer;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class TimedBuffer<T> {
-
- private final int numBins;
- private final EntitySum<T>[] bins;
- private final EntityAccess<T> entityAccess;
- private final TimeUnit binPrecision;
-
- @SuppressWarnings("unchecked")
- public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess<T> accessor) {
- this.binPrecision = binPrecision;
- this.numBins = numBins + 1;
- this.bins = new EntitySum[this.numBins];
- for (int i = 0; i < this.numBins; i++) {
- this.bins[i] = new EntitySum<>(binPrecision, numBins, accessor);
- }
- this.entityAccess = accessor;
- }
-
- public T add(final T entity) {
- final int binIdx = (int) (binPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) % numBins);
- final EntitySum<T> sum = bins[binIdx];
-
- return sum.addOrReset(entity);
- }
-
- public T getAggregateValue(final long sinceEpochMillis) {
- final int startBinIdx = (int) (binPrecision.convert(sinceEpochMillis, TimeUnit.MILLISECONDS) % numBins);
-
- T total = null;
- for (int i = 0; i < numBins; i++) {
- int binIdx = (startBinIdx + i) % numBins;
- final EntitySum<T> bin = bins[binIdx];
-
- if (!bin.isExpired()) {
- total = entityAccess.aggregate(total, bin.getValue());
- }
- }
-
- return total;
- }
-
- private static class EntitySum<S> {
-
- private final EntityAccess<S> entityAccess;
- private final AtomicReference<S> ref = new AtomicReference<>();
- private final TimeUnit binPrecision;
- private final int numConfiguredBins;
-
- public EntitySum(final TimeUnit binPrecision, final int numConfiguredBins, final EntityAccess<S> aggregator) {
- this.binPrecision = binPrecision;
- this.entityAccess = aggregator;
- this.numConfiguredBins = numConfiguredBins;
- }
-
- private S add(final S event) {
- S newValue;
- S value;
- do {
- value = ref.get();
- newValue = entityAccess.aggregate(value, event);
- } while (!ref.compareAndSet(value, newValue));
-
- return newValue;
- }
-
- public S getValue() {
- return ref.get();
- }
-
- public boolean isExpired() {
- // entityAccess.getTimestamp(curValue) represents the time at which the current value
- // was last updated. If the last value is less than current time - 1 binPrecision, then it
- // means that we've rolled over and need to reset the value.
- final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(numConfiguredBins, binPrecision);
-
- final S curValue = ref.get();
- return (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod);
- }
-
- public S addOrReset(final S event) {
- // entityAccess.getTimestamp(curValue) represents the time at which the current value
- // was last updated. If the last value is less than current time - 1 binPrecision, then it
- // means that we've rolled over and need to reset the value.
- final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(1, binPrecision);
-
- final S curValue = ref.get();
- if (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod) {
- ref.compareAndSet(curValue, entityAccess.createNew());
- }
- return add(event);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
deleted file mode 100644
index 07d31ea..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.timebuffer;
-
-public class TimestampedLong {
-
- private final Long value;
- private final long timestamp = System.currentTimeMillis();
-
- public TimestampedLong(final Long value) {
- this.value = value;
- }
-
- public Long getValue() {
- return value;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java b/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
deleted file mode 100644
index bd30a96..0000000
--- a/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-
-import org.apache.nifi.remote.io.CompressionInputStream;
-import org.apache.nifi.remote.io.CompressionOutputStream;
-
-import org.junit.Test;
-
-public class TestCompressionInputOutputStreams {
-
- @Test
- public void testSimple() throws IOException {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- final byte[] data = "Hello, World!".getBytes("UTF-8");
-
- final CompressionOutputStream cos = new CompressionOutputStream(baos);
- cos.write(data);
- cos.flush();
- cos.close();
-
- final byte[] compressedBytes = baos.toByteArray();
- final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes));
- final byte[] decompressed = readFully(cis);
-
- assertTrue(Arrays.equals(data, decompressed));
- }
-
- @Test
- public void testDataLargerThanBuffer() throws IOException {
- final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r";
-
- final StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 100; i++) {
- sb.append(str);
- }
- final byte[] data = sb.toString().getBytes("UTF-8");
-
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192);
- cos.write(data);
- cos.flush();
- cos.close();
-
- final byte[] compressedBytes = baos.toByteArray();
- final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes));
- final byte[] decompressed = readFully(cis);
-
- assertTrue(Arrays.equals(data, decompressed));
- }
-
- @Test
- public void testDataLargerThanBufferWhileFlushing() throws IOException {
- final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r";
- final byte[] data = str.getBytes("UTF-8");
-
- final StringBuilder sb = new StringBuilder();
- final byte[] data1024;
-
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192);
- for (int i = 0; i < 1024; i++) {
- cos.write(data);
- cos.flush();
- sb.append(str);
- }
- cos.close();
- data1024 = sb.toString().getBytes("UTF-8");
-
- final byte[] compressedBytes = baos.toByteArray();
- final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes));
- final byte[] decompressed = readFully(cis);
-
- assertTrue(Arrays.equals(data1024, decompressed));
- }
-
- @Test
- public void testSendingMultipleFilesBackToBackOnSameStream() throws IOException {
- final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r";
- final byte[] data = str.getBytes("UTF-8");
-
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192);
- for (int i = 0; i < 512; i++) {
- cos.write(data);
- cos.flush();
- }
- cos.close();
-
- final CompressionOutputStream cos2 = new CompressionOutputStream(baos, 8192);
- for (int i = 0; i < 512; i++) {
- cos2.write(data);
- cos2.flush();
- }
- cos2.close();
-
- final byte[] data512;
- final StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 512; i++) {
- sb.append(str);
- }
- data512 = sb.toString().getBytes("UTF-8");
-
- final byte[] compressedBytes = baos.toByteArray();
- final ByteArrayInputStream bais = new ByteArrayInputStream(compressedBytes);
-
- final CompressionInputStream cis = new CompressionInputStream(bais);
- final byte[] decompressed = readFully(cis);
- assertTrue(Arrays.equals(data512, decompressed));
-
- final CompressionInputStream cis2 = new CompressionInputStream(bais);
- final byte[] decompressed2 = readFully(cis2);
- assertTrue(Arrays.equals(data512, decompressed2));
- }
-
- private byte[] readFully(final InputStream in) throws IOException {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- final byte[] buffer = new byte[65536];
- int len;
- while ((len = in.read(buffer)) >= 0) {
- baos.write(buffer, 0, len);
- }
-
- return baos.toByteArray();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java b/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
deleted file mode 100644
index 52bd8de..0000000
--- a/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import org.apache.nifi.stream.io.ByteArrayInputStream;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
-import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-@Ignore("Tests are time-based")
-public class TestLeakyBucketThrottler {
-
- @Test(timeout = 10000)
- public void testOutputStreamInterface() throws IOException {
- // throttle rate at 1 MB/sec
- final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024);
-
- final byte[] data = new byte[1024 * 1024 * 4];
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final OutputStream throttledOut = throttler.newThrottledOutputStream(baos);
-
- final long start = System.currentTimeMillis();
- throttledOut.write(data);
- throttler.close();
- final long millis = System.currentTimeMillis() - start;
- // should take 4 sec give or take
- assertTrue(millis > 3000);
- assertTrue(millis < 6000);
- }
-
- @Test(timeout = 10000)
- public void testInputStreamInterface() throws IOException {
- // throttle rate at 1 MB/sec
- final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024);
-
- final byte[] data = new byte[1024 * 1024 * 4];
- final ByteArrayInputStream bais = new ByteArrayInputStream(data);
- final InputStream throttledIn = throttler.newThrottledInputStream(bais);
-
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- final byte[] buffer = new byte[4096];
- final long start = System.currentTimeMillis();
- int len;
- while ((len = throttledIn.read(buffer)) > 0) {
- baos.write(buffer, 0, len);
- }
- throttler.close();
- final long millis = System.currentTimeMillis() - start;
- // should take 4 sec give or take
- assertTrue(millis > 3000);
- assertTrue(millis < 6000);
- baos.close();
- }
-
- @Test(timeout = 10000)
- public void testDirectInterface() throws IOException, InterruptedException {
- // throttle rate at 1 MB/sec
- final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024);
-
- // create 3 threads, each sending ~2 MB
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final List<Thread> threads = new ArrayList<Thread>();
- for (int i = 0; i < 3; i++) {
- final Thread t = new WriterThread(i, throttler, baos);
- threads.add(t);
- }
-
- final long start = System.currentTimeMillis();
- for (final Thread t : threads) {
- t.start();
- }
-
- for (final Thread t : threads) {
- t.join();
- }
- final long elapsed = System.currentTimeMillis() - start;
-
- throttler.close();
-
- // To send 15 MB, it should have taken at least 5 seconds and no more than 7 seconds, to
- // allow for busy-ness and the fact that we could write a tiny bit more than the limit.
- assertTrue(elapsed > 5000);
- assertTrue(elapsed < 7000);
-
- // ensure bytes were copied out appropriately
- assertEquals(3 * (2 * 1024 * 1024 + 1), baos.getBufferLength());
- assertEquals((byte) 'A', baos.getUnderlyingBuffer()[baos.getBufferLength() - 1]);
- }
-
- private static class WriterThread extends Thread {
-
- private final int idx;
- private final byte[] data = new byte[1024 * 1024 * 2 + 1];
- private final LeakyBucketStreamThrottler throttler;
- private final OutputStream out;
-
- public WriterThread(final int idx, final LeakyBucketStreamThrottler throttler, final OutputStream out) {
- this.idx = idx;
- this.throttler = throttler;
- this.out = out;
- this.data[this.data.length - 1] = (byte) 'A';
- }
-
- @Override
- public void run() {
- long startMillis = System.currentTimeMillis();
- long bytesWritten = 0L;
- try {
- throttler.copy(new ByteArrayInputStream(data), out);
- } catch (IOException e) {
- e.printStackTrace();
- return;
- }
- long now = System.currentTimeMillis();
- long millisElapsed = now - startMillis;
- bytesWritten += data.length;
- float bytesPerSec = (float) bytesWritten / (float) millisElapsed * 1000F;
- System.out.println(idx + " : copied data at a rate of " + bytesPerSec + " bytes/sec");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java b/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java
deleted file mode 100644
index 0838e96..0000000
--- a/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-
-public class TestNaiveSearchRingBuffer {
-
- @Test
- public void testAddAndCompare() {
- final byte[] pattern = new byte[]{
- '\r', '0', 38, 48
- };
-
- final byte[] search = new byte[]{
- '\r', '0', 38, 58, 58, 83, 78, '\r', '0', 38, 48, 83, 92, 78, 4, 38
- };
-
- final NaiveSearchRingBuffer circ = new NaiveSearchRingBuffer(pattern);
- int counter = -1;
- for (final byte b : search) {
- counter++;
- final boolean matched = circ.addAndCompare(b);
- if (counter == 10) {
- assertTrue(matched);
- } else {
- assertFalse(matched);
- }
- }
- }
-
- @Test
- public void testGetOldestByte() {
- final byte[] pattern = new byte[]{
- '\r', '0', 38, 48
- };
-
- final byte[] search = new byte[]{
- '\r', '0', 38, 58, 58, 83, 78, (byte) 223, (byte) 227, (byte) 250, '\r', '0', 38, 48, 83, 92, 78, 4, 38
- };
-
- final NaiveSearchRingBuffer circ = new NaiveSearchRingBuffer(pattern);
- int counter = -1;
- for (final byte b : search) {
- counter++;
- final boolean matched = circ.addAndCompare(b);
- if (counter == 13) {
- assertTrue(matched);
- } else {
- assertFalse(matched);
- }
- }
- }
-
-}