You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:30:02 UTC

[39/51] [partial] incubator-nifi git commit: Initial code contribution

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/test/resources/logback-test.xml b/commons/nifi-stream-utils/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..0f3f60c
--- /dev/null
+++ b/commons/nifi-stream-utils/src/test/resources/logback-test.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<configuration scan="true" scanPeriod="30 seconds">
+    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <pattern>%-4r [%t] %-5p %c - %m%n</pattern>
+        </encoder>
+    </appender>
+    
+    <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR -->
+    <logger name="org.apache.nifi" level="DEBUG"/>
+    
+    <!-- Logger for managing logging statements for nifi clusters. -->
+    <logger name="org.apache.nifi.cluster" level="INFO"/>
+
+    <!-- 
+        Logger for logging HTTP requests received by the web server.  Setting
+        log level to 'debug' activates HTTP request logging.
+    -->
+    <logger name="org.apache.nifi.server.JettyServer" level="INFO"/>
+
+    <!-- Logger for managing logging statements for jetty -->
+    <logger name="org.mortbay" level="INFO"/>
+
+    <!-- Suppress non-error messages due to excessive logging by class -->
+    <logger name="com.sun.jersey.spi.container.servlet.WebComponent" level="ERROR"/>
+
+    <logger name="org.apache.nifi.processors.standard" level="DEBUG"/>
+
+    <root level="INFO">
+        <appender-ref ref="CONSOLE"/>
+    </root>
+    
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/.gitignore
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/.gitignore b/commons/nifi-utils/.gitignore
new file mode 100755
index 0000000..12c5231
--- /dev/null
+++ b/commons/nifi-utils/.gitignore
@@ -0,0 +1,8 @@
+/target
+/target
+/target
+/target
+/target
+/target
+/target
+/target

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/pom.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/pom.xml b/commons/nifi-utils/pom.xml
new file mode 100644
index 0000000..8aeccd7
--- /dev/null
+++ b/commons/nifi-utils/pom.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-utils</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <packaging>jar</packaging>
+    <name>NiFi Utils</name>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/io/CompoundUpdateMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/CompoundUpdateMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/CompoundUpdateMonitor.java
new file mode 100644
index 0000000..e22032b
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/io/CompoundUpdateMonitor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An {@link UpdateMonitor} that combines multiple <code>UpdateMonitor</code>s
+ * such that it will indicate a change in a file only if ALL sub-monitors
+ * indicate a change. The sub-monitors will be applied in the order given and if
+ * any indicates that the state has not changed, the subsequent sub-monitors may
+ * not be given a chance to run
+ */
+public class CompoundUpdateMonitor implements UpdateMonitor {
+
+    private final List<UpdateMonitor> monitors;
+
+    public CompoundUpdateMonitor(final UpdateMonitor first, final UpdateMonitor... others) {
+        monitors = new ArrayList<>();
+        monitors.add(first);
+        for (final UpdateMonitor monitor : others) {
+            monitors.add(monitor);
+        }
+    }
+
+    @Override
+    public Object getCurrentState(final Path path) throws IOException {
+        return new DeferredMonitorAction(monitors, path);
+    }
+
+    private static class DeferredMonitorAction {
+
+        private static final Object NON_COMPUTED_VALUE = new Object();
+
+        private final List<UpdateMonitor> monitors;
+        private final Path path;
+
+        private final Object[] preCalculated;
+
+        public DeferredMonitorAction(final List<UpdateMonitor> monitors, final Path path) {
+            this.monitors = monitors;
+            this.path = path;
+            preCalculated = new Object[monitors.size()];
+
+            for (int i = 0; i < preCalculated.length; i++) {
+                preCalculated[i] = NON_COMPUTED_VALUE;
+            }
+        }
+
+        private Object getCalculatedValue(final int i) throws IOException {
+            if (preCalculated[i] == NON_COMPUTED_VALUE) {
+                preCalculated[i] = monitors.get(i).getCurrentState(path);
+            }
+
+            return preCalculated[i];
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            // must return true unless ALL DeferredMonitorAction's indicate that they are different
+            if (obj == null) {
+                return false;
+            }
+
+            if (!(obj instanceof DeferredMonitorAction)) {
+                return false;
+            }
+
+            final DeferredMonitorAction other = (DeferredMonitorAction) obj;
+            try {
+                // Go through each UpdateMonitor's value and check if the value has changed.
+                for (int i = 0; i < preCalculated.length; i++) {
+                    final Object mine = getCalculatedValue(i);
+                    final Object theirs = other.getCalculatedValue(i);
+
+                    if (mine == theirs) {
+                        // same
+                        return true;
+                    }
+
+                    if (mine == null && theirs == null) {
+                        // same
+                        return true;
+                    }
+
+                    if (mine.equals(theirs)) {
+                        return true;
+                    }
+                }
+            } catch (final IOException e) {
+                return false;
+            }
+
+            // No DeferredMonitorAction was the same as last time. Therefore, it's not equal
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/io/LastModifiedMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/LastModifiedMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/LastModifiedMonitor.java
new file mode 100644
index 0000000..f446465
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/io/LastModifiedMonitor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class LastModifiedMonitor implements UpdateMonitor {
+
+    @Override
+    public Object getCurrentState(final Path path) throws IOException {
+        return Files.getLastModifiedTime(path);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/io/MD5SumMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/MD5SumMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/MD5SumMonitor.java
new file mode 100644
index 0000000..1326c2a
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/io/MD5SumMonitor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+public class MD5SumMonitor implements UpdateMonitor {
+
+    @Override
+    public Object getCurrentState(final Path path) throws IOException {
+        final MessageDigest digest;
+        try {
+            digest = MessageDigest.getInstance("MD5");
+        } catch (final NoSuchAlgorithmException nsae) {
+            throw new AssertionError(nsae);
+        }
+
+        try (final FileInputStream fis = new FileInputStream(path.toFile())) {
+            int len;
+            final byte[] buffer = new byte[8192];
+            while ((len = fis.read(buffer)) > 0) {
+                digest.update(buffer, 0, len);
+            }
+        }
+
+        // Return a ByteBuffer instead of byte[] because we want equals() to do a deep equality
+        return ByteBuffer.wrap(digest.digest());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/io/SynchronousFileWatcher.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/SynchronousFileWatcher.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/SynchronousFileWatcher.java
new file mode 100644
index 0000000..785f1ac
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/io/SynchronousFileWatcher.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Allows the user to configure a {@link java.nio.file.Path Path} to watch for
+ * modifications and periodically poll to check if the file has been modified
+ */
+public class SynchronousFileWatcher {
+
+    private final Path path;
+    private final long checkUpdateMillis;
+    private final UpdateMonitor monitor;
+    private final AtomicReference<StateWrapper> lastState;
+    private final Lock resourceLock = new ReentrantLock();
+
+    public SynchronousFileWatcher(final Path path, final UpdateMonitor monitor) {
+        this(path, monitor, 0L);
+    }
+
+    public SynchronousFileWatcher(final Path path, final UpdateMonitor monitor, final long checkMillis) {
+        if (checkMillis < 0) {
+            throw new IllegalArgumentException();
+        }
+
+        this.path = path;
+        checkUpdateMillis = checkMillis;
+        this.monitor = monitor;
+
+        Object currentState;
+        try {
+            currentState = monitor.getCurrentState(path);
+        } catch (final IOException e) {
+            currentState = null;
+        }
+
+        this.lastState = new AtomicReference<>(new StateWrapper(currentState));
+    }
+
+    /**
+     * Checks if the file has been updated according to the configured
+     * {@link UpdateMonitor} and resets the state
+     *
+     * @return
+     * @throws IOException
+     */
+    public boolean checkAndReset() throws IOException {
+        if (checkUpdateMillis <= 0) { // if checkUpdateMillis <= 0, always check
+            return checkForUpdate();
+        } else {
+            final StateWrapper stateWrapper = lastState.get();
+            if (stateWrapper.getTimestamp() < System.currentTimeMillis() - checkUpdateMillis) {
+                return checkForUpdate();
+            }
+            return false;
+        }
+    }
+
+    private boolean checkForUpdate() throws IOException {
+        if (resourceLock.tryLock()) {
+            try {
+                final StateWrapper wrapper = lastState.get();
+                final Object newState = monitor.getCurrentState(path);
+                if (newState == null && wrapper.getState() == null) {
+                    return false;
+                }
+                if (newState == null || wrapper.getState() == null) {
+                    lastState.set(new StateWrapper(newState));
+                    return true;
+                }
+
+                final boolean unmodified = newState.equals(wrapper.getState());
+                if (!unmodified) {
+                    lastState.set(new StateWrapper(newState));
+                }
+                return !unmodified;
+            } finally {
+                resourceLock.unlock();
+            }
+        } else {
+            return false;
+        }
+    }
+
+    private static class StateWrapper {
+
+        private final Object state;
+        private final long timestamp;
+
+        public StateWrapper(final Object state) {
+            this.state = state;
+            this.timestamp = System.currentTimeMillis();
+        }
+
+        public Object getState() {
+            return state;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java
new file mode 100644
index 0000000..33fb444
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+public interface UpdateMonitor {
+
+    Object getCurrentState(Path path) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java
new file mode 100644
index 0000000..92061e0
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+public class BooleanHolder extends ObjectHolder<Boolean> {
+
+    public BooleanHolder(final boolean initialValue) {
+        super(initialValue);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
new file mode 100644
index 0000000..9954bfb
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class FormatUtils {
+
+    private static final String UNION = "|";
+
+    // for Data Sizes
+    private static final double BYTES_IN_KILOBYTE = 1024;
+    private static final double BYTES_IN_MEGABYTE = BYTES_IN_KILOBYTE * 1024;
+    private static final double BYTES_IN_GIGABYTE = BYTES_IN_MEGABYTE * 1024;
+    private static final double BYTES_IN_TERABYTE = BYTES_IN_GIGABYTE * 1024;
+
+    // for Time Durations
+    private static final String NANOS = join(UNION, "ns", "nano", "nanos", "nanoseconds");
+    private static final String MILLIS = join(UNION, "ms", "milli", "millis", "milliseconds");
+    private static final String SECS = join(UNION, "s", "sec", "secs", "second", "seconds");
+    private static final String MINS = join(UNION, "m", "min", "mins", "minute", "minutes");
+    private static final String HOURS = join(UNION, "h", "hr", "hrs", "hour", "hours");
+    private static final String DAYS = join(UNION, "d", "day", "days");
+
+    private static final String VALID_TIME_UNITS = join(UNION, NANOS, MILLIS, SECS, MINS, HOURS, DAYS);
+    public static final String TIME_DURATION_REGEX = "(\\d+)\\s*(" + VALID_TIME_UNITS + ")";
+    public static final Pattern TIME_DURATION_PATTERN = Pattern.compile(TIME_DURATION_REGEX);
+
+    /**
+     * Formats the specified count by adding commas.
+     *
+     * @param count
+     * @return
+     */
+    public static String formatCount(final long count) {
+        return NumberFormat.getIntegerInstance().format(count);
+    }
+
+    /**
+     * Formats the specified duration in 'mm:ss.SSS' format.
+     *
+     * @param sourceDuration
+     * @param sourceUnit
+     * @return
+     */
+    public static String formatMinutesSeconds(final long sourceDuration, final TimeUnit sourceUnit) {
+        final long millis = TimeUnit.MILLISECONDS.convert(sourceDuration, sourceUnit);
+        final SimpleDateFormat formatter = new SimpleDateFormat("mm:ss.SSS");
+        return formatter.format(new Date(millis));
+    }
+
+    /**
+     * Formats the specified duration in 'HH:mm:ss.SSS' format.
+     *
+     * @param sourceDuration
+     * @param sourceUnit
+     * @return
+     */
+    public static String formatHoursMinutesSeconds(final long sourceDuration, final TimeUnit sourceUnit) {
+        final long millis = TimeUnit.MILLISECONDS.convert(sourceDuration, sourceUnit);
+        final long millisInHour = TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
+        final int hours = (int) (millis / millisInHour);
+        final long whatsLeft = millis - hours * millisInHour;
+
+        return pad(hours) + ":" + new SimpleDateFormat("mm:ss.SSS").format(new Date(whatsLeft));
+    }
+
+    private static String pad(final int val) {
+        return (val < 10) ? "0" + val : String.valueOf(val);
+    }
+
+    /**
+     * Formats the specified data size in human readable format.
+     *
+     * @param dataSize Data size in bytes
+     * @return Human readable format
+     */
+    public static String formatDataSize(final double dataSize) {
+        // initialize the formatter
+        final NumberFormat format = NumberFormat.getNumberInstance();
+        format.setMaximumFractionDigits(2);
+
+        // check terabytes
+        double dataSizeToFormat = dataSize / BYTES_IN_TERABYTE;
+        if (dataSizeToFormat > 1) {
+            return format.format(dataSizeToFormat) + " TB";
+        }
+
+        // check gigabytes
+        dataSizeToFormat = dataSize / BYTES_IN_GIGABYTE;
+        if (dataSizeToFormat > 1) {
+            return format.format(dataSizeToFormat) + " GB";
+        }
+
+        // check megabytes
+        dataSizeToFormat = dataSize / BYTES_IN_MEGABYTE;
+        if (dataSizeToFormat > 1) {
+            return format.format(dataSizeToFormat) + " MB";
+        }
+
+        // check kilobytes
+        dataSizeToFormat = dataSize / BYTES_IN_KILOBYTE;
+        if (dataSizeToFormat > 1) {
+            return format.format(dataSizeToFormat) + " KB";
+        }
+
+        // default to bytes
+        return format.format(dataSize) + " bytes";
+    }
+
+    public static long getTimeDuration(final String value, final TimeUnit desiredUnit) {
+        final Matcher matcher = TIME_DURATION_PATTERN.matcher(value.toLowerCase());
+        if (!matcher.matches()) {
+            throw new IllegalArgumentException("Value '" + value + "' is not a valid Time Duration");
+        }
+
+        final String duration = matcher.group(1);
+        final String units = matcher.group(2);
+        TimeUnit specifiedTimeUnit = null;
+        switch (units.toLowerCase()) {
+            case "ns":
+            case "nano":
+            case "nanos":
+            case "nanoseconds":
+                specifiedTimeUnit = TimeUnit.NANOSECONDS;
+                break;
+            case "ms":
+            case "milli":
+            case "millis":
+            case "milliseconds":
+                specifiedTimeUnit = TimeUnit.MILLISECONDS;
+                break;
+            case "s":
+            case "sec":
+            case "secs":
+            case "second":
+            case "seconds":
+                specifiedTimeUnit = TimeUnit.SECONDS;
+                break;
+            case "m":
+            case "min":
+            case "mins":
+            case "minute":
+            case "minutes":
+                specifiedTimeUnit = TimeUnit.MINUTES;
+                break;
+            case "h":
+            case "hr":
+            case "hrs":
+            case "hour":
+            case "hours":
+                specifiedTimeUnit = TimeUnit.HOURS;
+                break;
+            case "d":
+            case "day":
+            case "days":
+                specifiedTimeUnit = TimeUnit.DAYS;
+                break;
+        }
+
+        final long durationVal = Long.parseLong(duration);
+        return desiredUnit.convert(durationVal, specifiedTimeUnit);
+    }
+
+    public static String formatUtilization(final double utilization) {
+        return utilization + "%";
+    }
+
+    private static String join(final String delimiter, final String... values) {
+        if (values.length == 0) {
+            return "";
+        } else if (values.length == 1) {
+            return values[0];
+        }
+
+        final StringBuilder sb = new StringBuilder();
+        sb.append(values[0]);
+        for (int i = 1; i < values.length; i++) {
+            sb.append(delimiter).append(values[i]);
+        }
+
+        return sb.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java
new file mode 100644
index 0000000..213bbc0
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+public class IntegerHolder extends ObjectHolder<Integer> {
+
+    public IntegerHolder(final int initialValue) {
+        super(initialValue);
+    }
+
+    public int addAndGet(final int delta) {
+        final int curValue = get();
+        final int newValue = curValue + delta;
+        set(newValue);
+        return newValue;
+    }
+
+    public int getAndAdd(final int delta) {
+        final int curValue = get();
+        final int newValue = curValue + delta;
+        set(newValue);
+        return curValue;
+    }
+
+    public int incrementAndGet() {
+        return addAndGet(1);
+    }
+
+    public int getAndIncrement() {
+        return getAndAdd(1);
+    }
+
+    public int decrementAndGet() {
+        return addAndGet(-1);
+    }
+
+    public int getAndDecrement() {
+        return getAndAdd(-1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java
new file mode 100644
index 0000000..ef70ce8
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+/**
+ * Wraps a Long value so that it can be declared <code>final</code> and still be
+ * accessed from which inner classes; the functionality is similar to that of an
+ * AtomicLong, but operations on this class are not atomic. This results in
+ * greater performance when the atomicity is not needed.
+ */
+public class LongHolder extends ObjectHolder<Long> {
+
+    public LongHolder(final long initialValue) {
+        super(initialValue);
+    }
+
+    public long addAndGet(final long delta) {
+        final long curValue = get();
+        final long newValue = curValue + delta;
+        set(newValue);
+        return newValue;
+    }
+
+    public long getAndAdd(final long delta) {
+        final long curValue = get();
+        final long newValue = curValue + delta;
+        set(newValue);
+        return curValue;
+    }
+
+    public long incrementAndGet() {
+        return addAndGet(1);
+    }
+
+    public long getAndIncrement() {
+        return getAndAdd(1);
+    }
+
+    public long decrementAndGet() {
+        return addAndGet(-1L);
+    }
+
+    public long getAndDecrement() {
+        return getAndAdd(-1L);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java
new file mode 100644
index 0000000..a58ec6a
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+/**
+ * A bean that holds a single value of type T.
+ *
+ * @param <T>
+ */
+public class ObjectHolder<T> {
+
+    private T value;
+
+    public ObjectHolder(final T initialValue) {
+        this.value = initialValue;
+    }
+
+    public T get() {
+        return value;
+    }
+
+    public void set(T value) {
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java
new file mode 100644
index 0000000..c0bb830
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Thread-safe implementation of a RingBuffer
+ *
+ * @param <T>
+ */
+public class RingBuffer<T> {
+
+    private final Object[] buffer;
+    private int insertionPointer = 0;
+    private boolean filled = false;
+
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Lock readLock = rwLock.readLock();
+    private final Lock writeLock = rwLock.writeLock();
+
+    public RingBuffer(final int size) {
+        buffer = new Object[size];
+    }
+
+    /**
+     * Adds the given value to the RingBuffer and returns the value that was
+     * removed in order to make room.
+     *
+     * @param value
+     * @return
+     */
+    @SuppressWarnings("unchecked")
+    public T add(final T value) {
+        Objects.requireNonNull(value);
+
+        writeLock.lock();
+        try {
+            final Object removed = buffer[insertionPointer];
+
+            buffer[insertionPointer] = value;
+
+            if (insertionPointer == buffer.length - 1) {
+                filled = true;
+            }
+
+            insertionPointer = (insertionPointer + 1) % buffer.length;
+            return (T) removed;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public int getSize() {
+        readLock.lock();
+        try {
+            return filled ? buffer.length : insertionPointer;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public List<T> getSelectedElements(final Filter<T> filter) {
+        return getSelectedElements(filter, Integer.MAX_VALUE);
+    }
+
+    public List<T> getSelectedElements(final Filter<T> filter, final int maxElements) {
+        final List<T> selected = new ArrayList<>(1000);
+        int numSelected = 0;
+        readLock.lock();
+        try {
+            for (int i = 0; i < buffer.length && numSelected < maxElements; i++) {
+                final int idx = (insertionPointer + i) % buffer.length;
+                final Object val = buffer[idx];
+                if (val == null) {
+                    continue;
+                }
+
+                @SuppressWarnings("unchecked")
+                final T element = (T) val;
+                if (filter.select(element)) {
+                    selected.add(element);
+                    numSelected++;
+                }
+            }
+        } finally {
+            readLock.unlock();
+        }
+        return selected;
+    }
+
+    public int countSelectedElements(final Filter<T> filter) {
+        int numSelected = 0;
+        readLock.lock();
+        try {
+            for (int i = 0; i < buffer.length; i++) {
+                final int idx = (insertionPointer + i) % buffer.length;
+                final Object val = buffer[idx];
+                if (val == null) {
+                    continue;
+                }
+
+                @SuppressWarnings("unchecked")
+                final T element = (T) val;
+                if (filter.select(element)) {
+                    numSelected++;
+                }
+            }
+        } finally {
+            readLock.unlock();
+        }
+
+        return numSelected;
+    }
+
+    /**
+     * Removes all elements from the RingBuffer that match the given filter
+     *
+     * @param filter
+     * @return
+     */
+    public int removeSelectedElements(final Filter<T> filter) {
+        int count = 0;
+
+        writeLock.lock();
+        try {
+            for (int i = 0; i < buffer.length; i++) {
+                final int idx = (insertionPointer + i + 1) % buffer.length;
+                final Object val = buffer[idx];
+                if (val == null) {
+                    continue;
+                }
+
+                @SuppressWarnings("unchecked")
+                final T element = (T) val;
+
+                if (filter.select(element)) {
+                    buffer[idx] = null;
+                }
+            }
+        } finally {
+            writeLock.unlock();
+        }
+
+        return count;
+    }
+
+    public List<T> asList() {
+        return getSelectedElements(new Filter<T>() {
+            @Override
+            public boolean select(final T value) {
+                return true;
+            }
+        });
+    }
+
+    public T getOldestElement() {
+        readLock.lock();
+        try {
+            return getElementData(insertionPointer);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public T getNewestElement() {
+        readLock.lock();
+        try {
+            int index = (insertionPointer == 0) ? buffer.length : insertionPointer - 1;
+            return getElementData(index);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private T getElementData(final int index) {
+        readLock.lock();
+        try {
+            return (T) buffer[index];
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    /**
+     * Iterates over each element in the RingBuffer, calling the
+     * {@link ForEachEvaluator#evaluate(Object) evaluate} method on each element
+     * in the RingBuffer. If the Evaluator returns {@code false}, the method
+     * will skip all remaining elements in the RingBuffer; otherwise, the next
+     * element will be evaluated until all elements have been evaluated.
+     *
+     * @param evaluator
+     */
+    public void forEach(final ForEachEvaluator<T> evaluator) {
+        forEach(evaluator, IterationDirection.FORWARD);
+    }
+
+    /**
+     * Iterates over each element in the RingBuffer, calling the
+     * {@link ForEachEvaluator#evaluate(Object) evaluate} method on each element
+     * in the RingBuffer. If the Evaluator returns {@code false}, the method
+     * will skip all remaining elements in the RingBuffer; otherwise, the next
+     * element will be evaluated until all elements have been evaluated.
+     *
+     * @param evaluator
+     * @param iterationDirection the order in which to iterate over the elements
+     * in the RingBuffer
+     */
+    public void forEach(final ForEachEvaluator<T> evaluator, final IterationDirection iterationDirection) {
+        readLock.lock();
+        try {
+            final int startIndex;
+            final int endIndex;
+            final int increment;
+
+            if (iterationDirection == IterationDirection.FORWARD) {
+                startIndex = 0;
+                endIndex = buffer.length - 1;
+                increment = 1;
+            } else {
+                startIndex = buffer.length - 1;
+                endIndex = 0;
+                increment = -1;
+            }
+
+            for (int i = startIndex; (iterationDirection == IterationDirection.FORWARD ? i <= endIndex : i >= endIndex); i += increment) {
+                final int idx = (insertionPointer + i) % buffer.length;
+                final Object val = buffer[idx];
+                if (val == null) {
+                    continue;
+                }
+
+                @SuppressWarnings("unchecked")
+                final T element = (T) val;
+                if (!evaluator.evaluate(element)) {
+                    return;
+                }
+            }
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public static interface Filter<S> {
+
+        boolean select(S value);
+    }
+
+    /**
+     * Defines an interface that can be used to iterate over all of the elements
+     * in the RingBuffer via the {@link #forEach} method
+     *
+     * @param <S>
+     */
+    public static interface ForEachEvaluator<S> {
+
+        /**
+         * Evaluates the given element and returns {@code true} if the next
+         * element should be evaluated, {@code false} otherwise
+         *
+         * @param value
+         * @return
+         */
+        boolean evaluate(S value);
+    }
+
+    public static enum IterationDirection {
+
+        FORWARD,
+        BACKWARD;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java
new file mode 100644
index 0000000..cd11930
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.concurrent.TimeUnit;
+
+public final class StopWatch {
+
+    private long startNanos = -1L;
+    private long duration = -1L;
+
+    /**
+     * Creates a StopWatch but does not start it
+     */
+    public StopWatch() {
+        this(false);
+    }
+
+    /**
+     * @param autoStart whether or not the timer should be started automatically
+     */
+    public StopWatch(final boolean autoStart) {
+        if (autoStart) {
+            start();
+        }
+    }
+
+    public void start() {
+        this.startNanos = System.nanoTime();
+        this.duration = -1L;
+    }
+
+    public void stop() {
+        if (startNanos < 0) {
+            throw new IllegalStateException("StopWatch has not been started");
+        }
+        this.duration = System.nanoTime() - startNanos;
+        this.startNanos = -1L;
+    }
+
+    /**
+     * Returns the amount of time that the StopWatch was running.
+     *
+     * @param timeUnit
+     * @return
+     *
+     * @throws IllegalStateException if the StopWatch has not been stopped via
+     * {@link #stop()}
+     */
+    public long getDuration(final TimeUnit timeUnit) {
+        if (duration < 0) {
+            throw new IllegalStateException("Cannot get duration until StopWatch has been stopped");
+        }
+        return timeUnit.convert(duration, TimeUnit.NANOSECONDS);
+    }
+
+    /**
+     * Returns the amount of time that has elapsed since the timer was started.
+     *
+     * @param timeUnit
+     * @return
+     */
+    public long getElapsed(final TimeUnit timeUnit) {
+        return timeUnit.convert(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS);
+    }
+
+    public String calculateDataRate(final long bytes) {
+        final double seconds = (double) duration / 1000000000.0D;
+        final long dataSize = (long) (bytes / seconds);
+        return FormatUtils.formatDataSize(dataSize) + "/sec";
+    }
+
+    public String getDuration() {
+        final StringBuilder sb = new StringBuilder();
+
+        long duration = this.duration;
+        final long minutes = (duration > 60000000000L) ? (duration / 60000000000L) : 0L;
+        duration -= TimeUnit.NANOSECONDS.convert(minutes, TimeUnit.MINUTES);
+
+        final long seconds = (duration > 1000000000L) ? (duration / 1000000000L) : 0L;
+        duration -= TimeUnit.NANOSECONDS.convert(seconds, TimeUnit.SECONDS);
+
+        final long millis = (duration > 1000000L) ? (duration / 1000000L) : 0L;
+        duration -= TimeUnit.NANOSECONDS.convert(millis, TimeUnit.MILLISECONDS);
+
+        final long nanos = duration % 1000000L;
+
+        if (minutes > 0) {
+            sb.append(minutes).append(" minutes");
+        }
+
+        if (seconds > 0) {
+            if (minutes > 0) {
+                sb.append(", ");
+            }
+
+            sb.append(seconds).append(" seconds");
+        }
+
+        if (millis > 0) {
+            if (seconds > 0) {
+                sb.append(", ");
+            }
+
+            sb.append(millis).append(" millis");
+        }
+        if (seconds == 0 && millis == 0) {
+            sb.append(nanos).append(" nanos");
+        }
+
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java
new file mode 100644
index 0000000..63736ed
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+/**
+ *
+ * @author unattrib
+ * @param <A>
+ * @param <B>
+ */
+public class Tuple<A, B> {
+
+    final A key;
+    final B value;
+
+    public Tuple(A key, B value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    public A getKey() {
+        return key;
+    }
+
+    public B getValue() {
+        return value;
+    }
+
+    @Override
+    public boolean equals(final Object other) {
+        if (other == null) {
+            return false;
+        }
+        if (other == this) {
+            return true;
+        }
+        if (!(other instanceof Tuple)) {
+            return false;
+        }
+
+        final Tuple<?, ?> tuple = (Tuple<?, ?>) other;
+        if (key == null) {
+            if (tuple.key != null) {
+                return false;
+            }
+        } else {
+            if (!key.equals(tuple.key)) {
+                return false;
+            }
+        }
+
+        if (value == null) {
+            if (tuple.value != null) {
+                return false;
+            }
+        } else {
+            if (!value.equals(tuple.value)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return 581 + (this.key == null ? 0 : this.key.hashCode()) + (this.value == null ? 0 : this.value.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java
new file mode 100644
index 0000000..a8d7e82
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.concurrency;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+public class DebugDisabledTimedLock implements DebuggableTimedLock {
+
+    private final Lock lock;
+
+    public DebugDisabledTimedLock(final Lock lock) {
+        this.lock = lock;
+    }
+
+    /**
+     *
+     * @return
+     */
+    @Override
+    public boolean tryLock() {
+        return lock.tryLock();
+    }
+
+    /**
+     *
+     * @param timeout
+     * @param timeUnit
+     * @return
+     */
+    @Override
+    public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
+        try {
+            return lock.tryLock(timeout, timeUnit);
+        } catch (InterruptedException e) {
+            return false;
+        }
+    }
+
+    /**
+     *
+     */
+    @Override
+    public void lock() {
+        lock.lock();
+    }
+
+    @Override
+    public void unlock(final String task) {
+        lock.unlock();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java
new file mode 100644
index 0000000..f082168
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.concurrency;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DebugEnabledTimedLock implements DebuggableTimedLock {
+
+    private final Lock lock;
+    private final Logger logger;
+    private long lockTime = 0L;
+
+    private final Map<String, Long> lockIterations = new HashMap<>();
+    private final Map<String, Long> lockNanos = new HashMap<>();
+
+    private final String name;
+    private final int iterationFrequency;
+
+    public DebugEnabledTimedLock(final Lock lock, final String name, final int iterationFrequency) {
+        this.lock = lock;
+        this.name = name;
+        this.iterationFrequency = iterationFrequency;
+        logger = LoggerFactory.getLogger(TimedLock.class.getName() + "." + name);
+    }
+
+    /**
+     *
+     * @return
+     */
+    @Override
+    public boolean tryLock() {
+        logger.trace("Trying to obtain Lock: {}", name);
+        final boolean success = lock.tryLock();
+        if (!success) {
+            logger.trace("TryLock failed for Lock: {}", name);
+            return false;
+        }
+        logger.trace("TryLock successful");
+
+        return true;
+    }
+
+    /**
+     *
+     * @param timeout
+     * @param timeUnit
+     * @return
+     */
+    @Override
+    public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
+        logger.trace("Trying to obtain Lock {} with a timeout of {} {}", name, timeout, timeUnit);
+        final boolean success;
+        try {
+            success = lock.tryLock(timeout, timeUnit);
+        } catch (final InterruptedException ie) {
+            return false;
+        }
+
+        if (!success) {
+            logger.trace("TryLock failed for Lock {} with a timeout of {} {}", name, timeout, timeUnit);
+            return false;
+        }
+        logger.trace("TryLock successful");
+        return true;
+    }
+
+    /**
+     *
+     */
+    @Override
+    public void lock() {
+        logger.trace("Obtaining Lock {}", name);
+        lock.lock();
+        lockTime = System.nanoTime();
+        logger.trace("Obtained Lock {}", name);
+    }
+
+    /**
+     *
+     * @param task
+     */
+    @Override
+    public void unlock(final String task) {
+        if (lockTime <= 0L) {
+            lock.unlock();
+            return;
+        }
+
+        logger.trace("Releasing Lock {}", name);
+        final long nanosLocked = System.nanoTime() - lockTime;
+
+        Long startIterations = lockIterations.get(task);
+        if (startIterations == null) {
+            startIterations = 0L;
+        }
+        final long iterations = startIterations + 1L;
+        lockIterations.put(task, iterations);
+
+        Long startNanos = lockNanos.get(task);
+        if (startNanos == null) {
+            startNanos = 0L;
+        }
+        final long totalNanos = startNanos + nanosLocked;
+        lockNanos.put(task, totalNanos);
+
+        lockTime = -1L;
+
+        lock.unlock();
+        logger.trace("Released Lock {}", name);
+
+        if (iterations % iterationFrequency == 0) {
+            logger.debug("Lock {} held for {} nanos for task: {}; total lock iterations: {}; total lock nanos: {}", name, nanosLocked, task, iterations, totalNanos);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java
new file mode 100644
index 0000000..69da6e8
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.concurrency;
+
+import java.util.concurrent.TimeUnit;
+
+public interface DebuggableTimedLock {
+
+    void lock();
+
+    boolean tryLock(long timePeriod, TimeUnit timeUnit);
+
+    boolean tryLock();
+
+    void unlock(String task);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java
new file mode 100644
index 0000000..532d3c3
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.concurrency;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TimedLock {
+
+    private final DebugEnabledTimedLock enabled;
+    private final DebugDisabledTimedLock disabled;
+
+    private final Logger logger;
+
+    public TimedLock(final Lock lock, final String name, final int iterationFrequency) {
+        this.enabled = new DebugEnabledTimedLock(lock, name, iterationFrequency);
+        this.disabled = new DebugDisabledTimedLock(lock);
+
+        logger = LoggerFactory.getLogger(TimedLock.class.getName() + "." + name);
+    }
+
+    private DebuggableTimedLock getLock() {
+        return logger.isDebugEnabled() ? enabled : disabled;
+    }
+
+    public boolean tryLock() {
+        return getLock().tryLock();
+    }
+
+    public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
+        return getLock().tryLock(timeout, timeUnit);
+    }
+
+    public void lock() {
+        getLock().lock();
+    }
+
+    public void unlock(final String task) {
+        getLock().unlock(task);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java
new file mode 100644
index 0000000..2b95897
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+public interface EntityAccess<T> {
+
+    T aggregate(T oldValue, T toAdd);
+
+    T createNew();
+
+    long getTimestamp(T entity);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
new file mode 100644
index 0000000..193abc6
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+public class LongEntityAccess implements EntityAccess<TimestampedLong> {
+
+    @Override
+    public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) {
+        if (oldValue == null && toAdd == null) {
+            return new TimestampedLong(0L);
+        } else if (oldValue == null) {
+            return toAdd;
+        } else if (toAdd == null) {
+            return oldValue;
+        }
+
+        return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
+    }
+
+    @Override
+    public TimestampedLong createNew() {
+        return new TimestampedLong(0L);
+    }
+
+    @Override
+    public long getTimestamp(TimestampedLong entity) {
+        return entity == null ? 0L : entity.getTimestamp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
new file mode 100644
index 0000000..dd8e523
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TimedBuffer<T> {
+
+    private final int numBins;
+    private final EntitySum<T>[] bins;
+    private final EntityAccess<T> entityAccess;
+    private final TimeUnit binPrecision;
+
+    @SuppressWarnings("unchecked")
+    public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess<T> accessor) {
+        this.binPrecision = binPrecision;
+        this.numBins = numBins + 1;
+        this.bins = new EntitySum[this.numBins];
+        for (int i = 0; i < this.numBins; i++) {
+            this.bins[i] = new EntitySum<>(binPrecision, numBins, accessor);
+        }
+        this.entityAccess = accessor;
+    }
+
+    public T add(final T entity) {
+        final int binIdx = (int) (binPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) % numBins);
+        final EntitySum<T> sum = bins[binIdx];
+
+        return sum.addOrReset(entity);
+    }
+
+    public T getAggregateValue(final long sinceEpochMillis) {
+        final int startBinIdx = (int) (binPrecision.convert(sinceEpochMillis, TimeUnit.MILLISECONDS) % numBins);
+
+        T total = null;
+        for (int i = 0; i < numBins; i++) {
+            int binIdx = (startBinIdx + i) % numBins;
+            final EntitySum<T> bin = bins[binIdx];
+
+            if (!bin.isExpired()) {
+                total = entityAccess.aggregate(total, bin.getValue());
+            }
+        }
+
+        return total;
+    }
+
+    private static class EntitySum<S> {
+
+        private final EntityAccess<S> entityAccess;
+        private final AtomicReference<S> ref = new AtomicReference<>();
+        private final TimeUnit binPrecision;
+        private final int numConfiguredBins;
+
+        public EntitySum(final TimeUnit binPrecision, final int numConfiguredBins, final EntityAccess<S> aggregator) {
+            this.binPrecision = binPrecision;
+            this.entityAccess = aggregator;
+            this.numConfiguredBins = numConfiguredBins;
+        }
+
+        private S add(final S event) {
+            S newValue;
+            S value;
+            do {
+                value = ref.get();
+                newValue = entityAccess.aggregate(value, event);
+            } while (!ref.compareAndSet(value, newValue));
+
+            return newValue;
+        }
+
+        public S getValue() {
+            return ref.get();
+        }
+
+        public boolean isExpired() {
+            // entityAccess.getTimestamp(curValue) represents the time at which the current value
+            // was last updated. If the last value is less than current time - 1 binPrecision, then it
+            // means that we've rolled over and need to reset the value.
+            final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(numConfiguredBins, binPrecision);
+
+            final S curValue = ref.get();
+            return (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod);
+        }
+
+        public S addOrReset(final S event) {
+            // entityAccess.getTimestamp(curValue) represents the time at which the current value
+            // was last updated. If the last value is less than current time - 1 binPrecision, then it
+            // means that we've rolled over and need to reset the value.
+            final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(1, binPrecision);
+
+            final S curValue = ref.get();
+            if (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod) {
+                ref.compareAndSet(curValue, entityAccess.createNew());
+            }
+            return add(event);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
new file mode 100644
index 0000000..07d31ea
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+public class TimestampedLong {
+
+    private final Long value;
+    private final long timestamp = System.currentTimeMillis();
+
+    public TimestampedLong(final Long value) {
+        this.value = value;
+    }
+
+    public Long getValue() {
+        return value;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java
new file mode 100644
index 0000000..c796a96
--- /dev/null
+++ b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.util.UUID;
+
+import org.apache.nifi.io.CompoundUpdateMonitor;
+import org.apache.nifi.io.LastModifiedMonitor;
+import org.apache.nifi.io.MD5SumMonitor;
+import org.apache.nifi.io.UpdateMonitor;
+
+import org.junit.Test;
+
+public class TestCompoundUpdateMonitor {
+
+    @Test
+    public void test() throws IOException {
+        final UpdateMonitor lastModified = new LastModifiedMonitor();
+        final MD5SumMonitor md5 = new MD5SumMonitor();
+        final CompoundUpdateMonitor compound = new CompoundUpdateMonitor(lastModified, md5);
+
+        final File file = new File("target/" + UUID.randomUUID().toString());
+        if (file.exists()) {
+            assertTrue(file.delete());
+        }
+        assertTrue(file.createNewFile());
+
+        final Path path = file.toPath();
+
+        final Object curState = compound.getCurrentState(path);
+        final Object state2 = compound.getCurrentState(path);
+
+        assertEquals(curState, state2);
+        file.setLastModified(System.currentTimeMillis() + 1000L);
+        final Object state3 = compound.getCurrentState(path);
+        assertEquals(state2, state3);
+
+        final Object state4 = compound.getCurrentState(path);
+        assertEquals(state3, state4);
+
+        final long lastModifiedDate = file.lastModified();
+        try (final OutputStream out = new FileOutputStream(file)) {
+            out.write("Hello".getBytes("UTF-8"));
+        }
+
+        file.setLastModified(lastModifiedDate);
+
+        final Object state5 = compound.getCurrentState(path);
+        assertNotSame(state4, state5);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java
new file mode 100644
index 0000000..fafffdd
--- /dev/null
+++ b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.util.RingBuffer;
+import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
+import org.apache.nifi.util.RingBuffer.IterationDirection;
+
+import org.junit.Test;
+
+/**
+ *
+ */
+public class TestRingBuffer {
+
+    @Test
+    public void testAsList() {
+        final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10);
+
+        final List<Integer> emptyList = ringBuffer.asList();
+        assertTrue(emptyList.isEmpty());
+
+        for (int i = 0; i < 3; i++) {
+            ringBuffer.add(i);
+        }
+
+        List<Integer> list = ringBuffer.asList();
+        assertEquals(3, list.size());
+        for (int i = 0; i < 3; i++) {
+            assertEquals(Integer.valueOf(i), list.get(i));
+        }
+
+        for (int i = 3; i < 10; i++) {
+            ringBuffer.add(i);
+        }
+
+        list = ringBuffer.asList();
+        assertEquals(10, list.size());
+        for (int i = 0; i < 10; i++) {
+            assertEquals(Integer.valueOf(i), list.get(i));
+        }
+    }
+
+    @Test
+    public void testIterateForwards() {
+        final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10);
+
+        final int[] values = new int[]{3, 5, 20, 7};
+        for (final int v : values) {
+            ringBuffer.add(v);
+        }
+
+        final AtomicInteger countHolder = new AtomicInteger(0);
+        ringBuffer.forEach(new ForEachEvaluator<Integer>() {
+            int counter = 0;
+
+            @Override
+            public boolean evaluate(final Integer value) {
+                final int expected = values[counter++];
+                countHolder.incrementAndGet();
+                assertEquals(expected, value.intValue());
+                return true;
+            }
+
+        }, IterationDirection.FORWARD);
+
+        assertEquals(4, countHolder.get());
+    }
+
+    @Test
+    public void testIterateForwardsAfterFull() {
+        final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10);
+
+        for (int i = 0; i < 12; i++) {
+            ringBuffer.add(i);
+        }
+
+        final int[] values = new int[]{3, 5, 20, 7};
+        for (final int v : values) {
+            ringBuffer.add(v);
+        }
+
+        ringBuffer.forEach(new ForEachEvaluator<Integer>() {
+            int counter = 0;
+
+            @Override
+            public boolean evaluate(final Integer value) {
+                if (counter < 6) {
+                    assertEquals(counter + 6, value.intValue());
+                } else {
+                    final int expected = values[counter - 6];
+                    assertEquals(expected, value.intValue());
+                }
+
+                counter++;
+                return true;
+            }
+
+        }, IterationDirection.FORWARD);
+    }
+
+    @Test
+    public void testIterateBackwards() {
+        final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10);
+
+        final int[] values = new int[]{3, 5, 20, 7};
+        for (final int v : values) {
+            ringBuffer.add(v);
+        }
+
+        final AtomicInteger countHolder = new AtomicInteger(0);
+        ringBuffer.forEach(new ForEachEvaluator<Integer>() {
+            int counter = 0;
+
+            @Override
+            public boolean evaluate(final Integer value) {
+                final int index = values.length - 1 - counter;
+                final int expected = values[index];
+                countHolder.incrementAndGet();
+
+                assertEquals(expected, value.intValue());
+                counter++;
+                return true;
+            }
+
+        }, IterationDirection.BACKWARD);
+
+        assertEquals(4, countHolder.get());
+    }
+
+    @Test
+    public void testIterateBackwardsAfterFull() {
+        final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10);
+
+        for (int i = 0; i < 12; i++) {
+            ringBuffer.add(i);
+        }
+
+        final int[] values = new int[]{3, 5, 20, 7};
+        for (final int v : values) {
+            ringBuffer.add(v);
+        }
+
+        ringBuffer.forEach(new ForEachEvaluator<Integer>() {
+            int counter = 0;
+
+            @Override
+            public boolean evaluate(final Integer value) {
+                if (counter < values.length) {
+                    final int index = values.length - 1 - counter;
+                    final int expected = values[index];
+
+                    assertEquals(expected, value.intValue());
+                    counter++;
+                }
+
+                return true;
+            }
+
+        }, IterationDirection.BACKWARD);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java
new file mode 100644
index 0000000..4b2c0d5
--- /dev/null
+++ b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+
+import org.junit.Test;
+
+import org.apache.nifi.io.MD5SumMonitor;
+import org.apache.nifi.io.SynchronousFileWatcher;
+import org.apache.nifi.io.UpdateMonitor;
+
+public class TestSynchronousFileWatcher {
+
+    @Test
+    public void testIt() throws UnsupportedEncodingException, IOException, InterruptedException {
+        final Path path = Paths.get("target/1.txt");
+        Files.copy(new ByteArrayInputStream("Hello, World!".getBytes("UTF-8")), path, StandardCopyOption.REPLACE_EXISTING);
+        final UpdateMonitor monitor = new MD5SumMonitor();
+
+        final SynchronousFileWatcher watcher = new SynchronousFileWatcher(path, monitor, 10L);
+        assertFalse(watcher.checkAndReset());
+        Thread.sleep(30L);
+        assertFalse(watcher.checkAndReset());
+
+        final FileOutputStream fos = new FileOutputStream(path.toFile());
+        try {
+            fos.write("Good-bye, World!".getBytes("UTF-8"));
+            fos.getFD().sync();
+        } finally {
+            fos.close();
+        }
+
+        assertTrue(watcher.checkAndReset());
+        assertFalse(watcher.checkAndReset());
+
+        Thread.sleep(30L);
+        assertFalse(watcher.checkAndReset());
+    }
+}