You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:30:02 UTC
[39/51] [partial] incubator-nifi git commit: Initial code contribution
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/test/resources/logback-test.xml b/commons/nifi-stream-utils/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..0f3f60c
--- /dev/null
+++ b/commons/nifi-stream-utils/src/test/resources/logback-test.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration scan="true" scanPeriod="30 seconds">
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+ <pattern>%-4r [%t] %-5p %c - %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR -->
+ <logger name="org.apache.nifi" level="DEBUG"/>
+
+ <!-- Logger for managing logging statements for nifi clusters. -->
+ <logger name="org.apache.nifi.cluster" level="INFO"/>
+
+ <!--
+ Logger for logging HTTP requests received by the web server. Setting
+ log level to 'debug' activates HTTP request logging.
+ -->
+ <logger name="org.apache.nifi.server.JettyServer" level="INFO"/>
+
+ <!-- Logger for managing logging statements for jetty -->
+ <logger name="org.mortbay" level="INFO"/>
+
+ <!-- Suppress non-error messages due to excessive logging by class -->
+ <logger name="com.sun.jersey.spi.container.servlet.WebComponent" level="ERROR"/>
+
+ <logger name="org.apache.nifi.processors.standard" level="DEBUG"/>
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ </root>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/.gitignore
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/.gitignore b/commons/nifi-utils/.gitignore
new file mode 100755
index 0000000..12c5231
--- /dev/null
+++ b/commons/nifi-utils/.gitignore
@@ -0,0 +1,8 @@
+/target
+/target
+/target
+/target
+/target
+/target
+/target
+/target
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/pom.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/pom.xml b/commons/nifi-utils/pom.xml
new file mode 100644
index 0000000..8aeccd7
--- /dev/null
+++ b/commons/nifi-utils/pom.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-utils</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <packaging>jar</packaging>
+ <name>NiFi Utils</name>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/io/CompoundUpdateMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/CompoundUpdateMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/CompoundUpdateMonitor.java
new file mode 100644
index 0000000..e22032b
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/io/CompoundUpdateMonitor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An {@link UpdateMonitor} that combines multiple <code>UpdateMonitor</code>s
+ * such that it will indicate a change in a file only if ALL sub-monitors
+ * indicate a change. The sub-monitors will be applied in the order given and if
+ * any indicates that the state has not changed, the subsequent sub-monitors may
+ * not be given a chance to run
+ */
+public class CompoundUpdateMonitor implements UpdateMonitor {
+
+ private final List<UpdateMonitor> monitors;
+
+ public CompoundUpdateMonitor(final UpdateMonitor first, final UpdateMonitor... others) {
+ monitors = new ArrayList<>();
+ monitors.add(first);
+ for (final UpdateMonitor monitor : others) {
+ monitors.add(monitor);
+ }
+ }
+
+ @Override
+ public Object getCurrentState(final Path path) throws IOException {
+ return new DeferredMonitorAction(monitors, path);
+ }
+
+ private static class DeferredMonitorAction {
+
+ private static final Object NON_COMPUTED_VALUE = new Object();
+
+ private final List<UpdateMonitor> monitors;
+ private final Path path;
+
+ private final Object[] preCalculated;
+
+ public DeferredMonitorAction(final List<UpdateMonitor> monitors, final Path path) {
+ this.monitors = monitors;
+ this.path = path;
+ preCalculated = new Object[monitors.size()];
+
+ for (int i = 0; i < preCalculated.length; i++) {
+ preCalculated[i] = NON_COMPUTED_VALUE;
+ }
+ }
+
+ private Object getCalculatedValue(final int i) throws IOException {
+ if (preCalculated[i] == NON_COMPUTED_VALUE) {
+ preCalculated[i] = monitors.get(i).getCurrentState(path);
+ }
+
+ return preCalculated[i];
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ // must return true unless ALL DeferredMonitorAction's indicate that they are different
+ if (obj == null) {
+ return false;
+ }
+
+ if (!(obj instanceof DeferredMonitorAction)) {
+ return false;
+ }
+
+ final DeferredMonitorAction other = (DeferredMonitorAction) obj;
+ try {
+ // Go through each UpdateMonitor's value and check if the value has changed.
+ for (int i = 0; i < preCalculated.length; i++) {
+ final Object mine = getCalculatedValue(i);
+ final Object theirs = other.getCalculatedValue(i);
+
+ if (mine == theirs) {
+ // same
+ return true;
+ }
+
+ if (mine == null && theirs == null) {
+ // same
+ return true;
+ }
+
+ if (mine.equals(theirs)) {
+ return true;
+ }
+ }
+ } catch (final IOException e) {
+ return false;
+ }
+
+ // No DeferredMonitorAction was the same as last time. Therefore, it's not equal
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/io/LastModifiedMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/LastModifiedMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/LastModifiedMonitor.java
new file mode 100644
index 0000000..f446465
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/io/LastModifiedMonitor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class LastModifiedMonitor implements UpdateMonitor {
+
+ @Override
+ public Object getCurrentState(final Path path) throws IOException {
+ return Files.getLastModifiedTime(path);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/io/MD5SumMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/MD5SumMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/MD5SumMonitor.java
new file mode 100644
index 0000000..1326c2a
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/io/MD5SumMonitor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+public class MD5SumMonitor implements UpdateMonitor {
+
+ @Override
+ public Object getCurrentState(final Path path) throws IOException {
+ final MessageDigest digest;
+ try {
+ digest = MessageDigest.getInstance("MD5");
+ } catch (final NoSuchAlgorithmException nsae) {
+ throw new AssertionError(nsae);
+ }
+
+ try (final FileInputStream fis = new FileInputStream(path.toFile())) {
+ int len;
+ final byte[] buffer = new byte[8192];
+ while ((len = fis.read(buffer)) > 0) {
+ digest.update(buffer, 0, len);
+ }
+ }
+
+ // Return a ByteBuffer instead of byte[] because we want equals() to do a deep equality
+ return ByteBuffer.wrap(digest.digest());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/io/SynchronousFileWatcher.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/SynchronousFileWatcher.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/SynchronousFileWatcher.java
new file mode 100644
index 0000000..785f1ac
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/io/SynchronousFileWatcher.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Allows the user to configure a {@link java.nio.file.Path Path} to watch for
+ * modifications and periodically poll to check if the file has been modified
+ */
+public class SynchronousFileWatcher {
+
+ private final Path path;
+ private final long checkUpdateMillis;
+ private final UpdateMonitor monitor;
+ private final AtomicReference<StateWrapper> lastState;
+ private final Lock resourceLock = new ReentrantLock();
+
+ public SynchronousFileWatcher(final Path path, final UpdateMonitor monitor) {
+ this(path, monitor, 0L);
+ }
+
+ public SynchronousFileWatcher(final Path path, final UpdateMonitor monitor, final long checkMillis) {
+ if (checkMillis < 0) {
+ throw new IllegalArgumentException();
+ }
+
+ this.path = path;
+ checkUpdateMillis = checkMillis;
+ this.monitor = monitor;
+
+ Object currentState;
+ try {
+ currentState = monitor.getCurrentState(path);
+ } catch (final IOException e) {
+ currentState = null;
+ }
+
+ this.lastState = new AtomicReference<>(new StateWrapper(currentState));
+ }
+
+ /**
+ * Checks if the file has been updated according to the configured
+ * {@link UpdateMonitor} and resets the state
+ *
+ * @return
+ * @throws IOException
+ */
+ public boolean checkAndReset() throws IOException {
+ if (checkUpdateMillis <= 0) { // if checkUpdateMillis <= 0, always check
+ return checkForUpdate();
+ } else {
+ final StateWrapper stateWrapper = lastState.get();
+ if (stateWrapper.getTimestamp() < System.currentTimeMillis() - checkUpdateMillis) {
+ return checkForUpdate();
+ }
+ return false;
+ }
+ }
+
+ private boolean checkForUpdate() throws IOException {
+ if (resourceLock.tryLock()) {
+ try {
+ final StateWrapper wrapper = lastState.get();
+ final Object newState = monitor.getCurrentState(path);
+ if (newState == null && wrapper.getState() == null) {
+ return false;
+ }
+ if (newState == null || wrapper.getState() == null) {
+ lastState.set(new StateWrapper(newState));
+ return true;
+ }
+
+ final boolean unmodified = newState.equals(wrapper.getState());
+ if (!unmodified) {
+ lastState.set(new StateWrapper(newState));
+ }
+ return !unmodified;
+ } finally {
+ resourceLock.unlock();
+ }
+ } else {
+ return false;
+ }
+ }
+
+ private static class StateWrapper {
+
+ private final Object state;
+ private final long timestamp;
+
+ public StateWrapper(final Object state) {
+ this.state = state;
+ this.timestamp = System.currentTimeMillis();
+ }
+
+ public Object getState() {
+ return state;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java
new file mode 100644
index 0000000..33fb444
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+public interface UpdateMonitor {
+
+ Object getCurrentState(Path path) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java
new file mode 100644
index 0000000..92061e0
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+public class BooleanHolder extends ObjectHolder<Boolean> {
+
+ public BooleanHolder(final boolean initialValue) {
+ super(initialValue);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
new file mode 100644
index 0000000..9954bfb
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class FormatUtils {
+
+ private static final String UNION = "|";
+
+ // for Data Sizes
+ private static final double BYTES_IN_KILOBYTE = 1024;
+ private static final double BYTES_IN_MEGABYTE = BYTES_IN_KILOBYTE * 1024;
+ private static final double BYTES_IN_GIGABYTE = BYTES_IN_MEGABYTE * 1024;
+ private static final double BYTES_IN_TERABYTE = BYTES_IN_GIGABYTE * 1024;
+
+ // for Time Durations
+ private static final String NANOS = join(UNION, "ns", "nano", "nanos", "nanoseconds");
+ private static final String MILLIS = join(UNION, "ms", "milli", "millis", "milliseconds");
+ private static final String SECS = join(UNION, "s", "sec", "secs", "second", "seconds");
+ private static final String MINS = join(UNION, "m", "min", "mins", "minute", "minutes");
+ private static final String HOURS = join(UNION, "h", "hr", "hrs", "hour", "hours");
+ private static final String DAYS = join(UNION, "d", "day", "days");
+
+ private static final String VALID_TIME_UNITS = join(UNION, NANOS, MILLIS, SECS, MINS, HOURS, DAYS);
+ public static final String TIME_DURATION_REGEX = "(\\d+)\\s*(" + VALID_TIME_UNITS + ")";
+ public static final Pattern TIME_DURATION_PATTERN = Pattern.compile(TIME_DURATION_REGEX);
+
+ /**
+ * Formats the specified count by adding commas.
+ *
+ * @param count
+ * @return
+ */
+ public static String formatCount(final long count) {
+ return NumberFormat.getIntegerInstance().format(count);
+ }
+
+ /**
+ * Formats the specified duration in 'mm:ss.SSS' format.
+ *
+ * @param sourceDuration
+ * @param sourceUnit
+ * @return
+ */
+ public static String formatMinutesSeconds(final long sourceDuration, final TimeUnit sourceUnit) {
+ final long millis = TimeUnit.MILLISECONDS.convert(sourceDuration, sourceUnit);
+ final SimpleDateFormat formatter = new SimpleDateFormat("mm:ss.SSS");
+ return formatter.format(new Date(millis));
+ }
+
+ /**
+ * Formats the specified duration in 'HH:mm:ss.SSS' format.
+ *
+ * @param sourceDuration
+ * @param sourceUnit
+ * @return
+ */
+ public static String formatHoursMinutesSeconds(final long sourceDuration, final TimeUnit sourceUnit) {
+ final long millis = TimeUnit.MILLISECONDS.convert(sourceDuration, sourceUnit);
+ final long millisInHour = TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
+ final int hours = (int) (millis / millisInHour);
+ final long whatsLeft = millis - hours * millisInHour;
+
+ return pad(hours) + ":" + new SimpleDateFormat("mm:ss.SSS").format(new Date(whatsLeft));
+ }
+
+ private static String pad(final int val) {
+ return (val < 10) ? "0" + val : String.valueOf(val);
+ }
+
+ /**
+ * Formats the specified data size in human readable format.
+ *
+ * @param dataSize Data size in bytes
+ * @return Human readable format
+ */
+ public static String formatDataSize(final double dataSize) {
+ // initialize the formatter
+ final NumberFormat format = NumberFormat.getNumberInstance();
+ format.setMaximumFractionDigits(2);
+
+ // check terabytes
+ double dataSizeToFormat = dataSize / BYTES_IN_TERABYTE;
+ if (dataSizeToFormat > 1) {
+ return format.format(dataSizeToFormat) + " TB";
+ }
+
+ // check gigabytes
+ dataSizeToFormat = dataSize / BYTES_IN_GIGABYTE;
+ if (dataSizeToFormat > 1) {
+ return format.format(dataSizeToFormat) + " GB";
+ }
+
+ // check megabytes
+ dataSizeToFormat = dataSize / BYTES_IN_MEGABYTE;
+ if (dataSizeToFormat > 1) {
+ return format.format(dataSizeToFormat) + " MB";
+ }
+
+ // check kilobytes
+ dataSizeToFormat = dataSize / BYTES_IN_KILOBYTE;
+ if (dataSizeToFormat > 1) {
+ return format.format(dataSizeToFormat) + " KB";
+ }
+
+ // default to bytes
+ return format.format(dataSize) + " bytes";
+ }
+
+ public static long getTimeDuration(final String value, final TimeUnit desiredUnit) {
+ final Matcher matcher = TIME_DURATION_PATTERN.matcher(value.toLowerCase());
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException("Value '" + value + "' is not a valid Time Duration");
+ }
+
+ final String duration = matcher.group(1);
+ final String units = matcher.group(2);
+ TimeUnit specifiedTimeUnit = null;
+ switch (units.toLowerCase()) {
+ case "ns":
+ case "nano":
+ case "nanos":
+ case "nanoseconds":
+ specifiedTimeUnit = TimeUnit.NANOSECONDS;
+ break;
+ case "ms":
+ case "milli":
+ case "millis":
+ case "milliseconds":
+ specifiedTimeUnit = TimeUnit.MILLISECONDS;
+ break;
+ case "s":
+ case "sec":
+ case "secs":
+ case "second":
+ case "seconds":
+ specifiedTimeUnit = TimeUnit.SECONDS;
+ break;
+ case "m":
+ case "min":
+ case "mins":
+ case "minute":
+ case "minutes":
+ specifiedTimeUnit = TimeUnit.MINUTES;
+ break;
+ case "h":
+ case "hr":
+ case "hrs":
+ case "hour":
+ case "hours":
+ specifiedTimeUnit = TimeUnit.HOURS;
+ break;
+ case "d":
+ case "day":
+ case "days":
+ specifiedTimeUnit = TimeUnit.DAYS;
+ break;
+ }
+
+ final long durationVal = Long.parseLong(duration);
+ return desiredUnit.convert(durationVal, specifiedTimeUnit);
+ }
+
+ public static String formatUtilization(final double utilization) {
+ return utilization + "%";
+ }
+
+ private static String join(final String delimiter, final String... values) {
+ if (values.length == 0) {
+ return "";
+ } else if (values.length == 1) {
+ return values[0];
+ }
+
+ final StringBuilder sb = new StringBuilder();
+ sb.append(values[0]);
+ for (int i = 1; i < values.length; i++) {
+ sb.append(delimiter).append(values[i]);
+ }
+
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java
new file mode 100644
index 0000000..213bbc0
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+public class IntegerHolder extends ObjectHolder<Integer> {
+
+ public IntegerHolder(final int initialValue) {
+ super(initialValue);
+ }
+
+ public int addAndGet(final int delta) {
+ final int curValue = get();
+ final int newValue = curValue + delta;
+ set(newValue);
+ return newValue;
+ }
+
+ public int getAndAdd(final int delta) {
+ final int curValue = get();
+ final int newValue = curValue + delta;
+ set(newValue);
+ return curValue;
+ }
+
+ public int incrementAndGet() {
+ return addAndGet(1);
+ }
+
+ public int getAndIncrement() {
+ return getAndAdd(1);
+ }
+
+ public int decrementAndGet() {
+ return addAndGet(-1);
+ }
+
+ public int getAndDecrement() {
+ return getAndAdd(-1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java
new file mode 100644
index 0000000..ef70ce8
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+/**
+ * Wraps a Long value so that it can be declared <code>final</code> and still be
+ * accessed from which inner classes; the functionality is similar to that of an
+ * AtomicLong, but operations on this class are not atomic. This results in
+ * greater performance when the atomicity is not needed.
+ */
+public class LongHolder extends ObjectHolder<Long> {
+
+ public LongHolder(final long initialValue) {
+ super(initialValue);
+ }
+
+ public long addAndGet(final long delta) {
+ final long curValue = get();
+ final long newValue = curValue + delta;
+ set(newValue);
+ return newValue;
+ }
+
+ public long getAndAdd(final long delta) {
+ final long curValue = get();
+ final long newValue = curValue + delta;
+ set(newValue);
+ return curValue;
+ }
+
+ public long incrementAndGet() {
+ return addAndGet(1);
+ }
+
+ public long getAndIncrement() {
+ return getAndAdd(1);
+ }
+
+ public long decrementAndGet() {
+ return addAndGet(-1L);
+ }
+
+ public long getAndDecrement() {
+ return getAndAdd(-1L);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java
new file mode 100644
index 0000000..a58ec6a
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+/**
+ * A bean that holds a single value of type T.
+ *
+ * @param <T>
+ */
+public class ObjectHolder<T> {
+
+ private T value;
+
+ public ObjectHolder(final T initialValue) {
+ this.value = initialValue;
+ }
+
+ public T get() {
+ return value;
+ }
+
+ public void set(T value) {
+ this.value = value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java
new file mode 100644
index 0000000..c0bb830
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Thread-safe implementation of a RingBuffer
+ *
+ * @param <T>
+ */
+public class RingBuffer<T> {
+
+ private final Object[] buffer;
+ private int insertionPointer = 0;
+ private boolean filled = false;
+
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final Lock readLock = rwLock.readLock();
+ private final Lock writeLock = rwLock.writeLock();
+
+ public RingBuffer(final int size) {
+ buffer = new Object[size];
+ }
+
+ /**
+ * Adds the given value to the RingBuffer and returns the value that was
+ * removed in order to make room.
+ *
+ * @param value
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ public T add(final T value) {
+ Objects.requireNonNull(value);
+
+ writeLock.lock();
+ try {
+ final Object removed = buffer[insertionPointer];
+
+ buffer[insertionPointer] = value;
+
+ if (insertionPointer == buffer.length - 1) {
+ filled = true;
+ }
+
+ insertionPointer = (insertionPointer + 1) % buffer.length;
+ return (T) removed;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public int getSize() {
+ readLock.lock();
+ try {
+ return filled ? buffer.length : insertionPointer;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public List<T> getSelectedElements(final Filter<T> filter) {
+ return getSelectedElements(filter, Integer.MAX_VALUE);
+ }
+
+ public List<T> getSelectedElements(final Filter<T> filter, final int maxElements) {
+ final List<T> selected = new ArrayList<>(1000);
+ int numSelected = 0;
+ readLock.lock();
+ try {
+ for (int i = 0; i < buffer.length && numSelected < maxElements; i++) {
+ final int idx = (insertionPointer + i) % buffer.length;
+ final Object val = buffer[idx];
+ if (val == null) {
+ continue;
+ }
+
+ @SuppressWarnings("unchecked")
+ final T element = (T) val;
+ if (filter.select(element)) {
+ selected.add(element);
+ numSelected++;
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return selected;
+ }
+
+ public int countSelectedElements(final Filter<T> filter) {
+ int numSelected = 0;
+ readLock.lock();
+ try {
+ for (int i = 0; i < buffer.length; i++) {
+ final int idx = (insertionPointer + i) % buffer.length;
+ final Object val = buffer[idx];
+ if (val == null) {
+ continue;
+ }
+
+ @SuppressWarnings("unchecked")
+ final T element = (T) val;
+ if (filter.select(element)) {
+ numSelected++;
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+
+ return numSelected;
+ }
+
+ /**
+ * Removes all elements from the RingBuffer that match the given filter
+ *
+ * @param filter
+ * @return
+ */
+ public int removeSelectedElements(final Filter<T> filter) {
+ int count = 0;
+
+ writeLock.lock();
+ try {
+ for (int i = 0; i < buffer.length; i++) {
+ final int idx = (insertionPointer + i + 1) % buffer.length;
+ final Object val = buffer[idx];
+ if (val == null) {
+ continue;
+ }
+
+ @SuppressWarnings("unchecked")
+ final T element = (T) val;
+
+ if (filter.select(element)) {
+ buffer[idx] = null;
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+
+ return count;
+ }
+
+ public List<T> asList() {
+ return getSelectedElements(new Filter<T>() {
+ @Override
+ public boolean select(final T value) {
+ return true;
+ }
+ });
+ }
+
+ public T getOldestElement() {
+ readLock.lock();
+ try {
+ return getElementData(insertionPointer);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public T getNewestElement() {
+ readLock.lock();
+ try {
+ int index = (insertionPointer == 0) ? buffer.length : insertionPointer - 1;
+ return getElementData(index);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private T getElementData(final int index) {
+ readLock.lock();
+ try {
+ return (T) buffer[index];
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Iterates over each element in the RingBuffer, calling the
+ * {@link ForEachEvaluator#evaluate(Object) evaluate} method on each element
+ * in the RingBuffer. If the Evaluator returns {@code false}, the method
+ * will skip all remaining elements in the RingBuffer; otherwise, the next
+ * element will be evaluated until all elements have been evaluated.
+ *
+ * @param evaluator
+ */
+ public void forEach(final ForEachEvaluator<T> evaluator) {
+ forEach(evaluator, IterationDirection.FORWARD);
+ }
+
+ /**
+ * Iterates over each element in the RingBuffer, calling the
+ * {@link ForEachEvaluator#evaluate(Object) evaluate} method on each element
+ * in the RingBuffer. If the Evaluator returns {@code false}, the method
+ * will skip all remaining elements in the RingBuffer; otherwise, the next
+ * element will be evaluated until all elements have been evaluated.
+ *
+ * @param evaluator
+ * @param iterationDirection the order in which to iterate over the elements
+ * in the RingBuffer
+ */
+ public void forEach(final ForEachEvaluator<T> evaluator, final IterationDirection iterationDirection) {
+ readLock.lock();
+ try {
+ final int startIndex;
+ final int endIndex;
+ final int increment;
+
+ if (iterationDirection == IterationDirection.FORWARD) {
+ startIndex = 0;
+ endIndex = buffer.length - 1;
+ increment = 1;
+ } else {
+ startIndex = buffer.length - 1;
+ endIndex = 0;
+ increment = -1;
+ }
+
+ for (int i = startIndex; (iterationDirection == IterationDirection.FORWARD ? i <= endIndex : i >= endIndex); i += increment) {
+ final int idx = (insertionPointer + i) % buffer.length;
+ final Object val = buffer[idx];
+ if (val == null) {
+ continue;
+ }
+
+ @SuppressWarnings("unchecked")
+ final T element = (T) val;
+ if (!evaluator.evaluate(element)) {
+ return;
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public static interface Filter<S> {
+
+ boolean select(S value);
+ }
+
+ /**
+ * Defines an interface that can be used to iterate over all of the elements
+ * in the RingBuffer via the {@link #forEach} method
+ *
+ * @param <S>
+ */
+ public static interface ForEachEvaluator<S> {
+
+ /**
+ * Evaluates the given element and returns {@code true} if the next
+ * element should be evaluated, {@code false} otherwise
+ *
+ * @param value
+ * @return
+ */
+ boolean evaluate(S value);
+ }
+
+ public static enum IterationDirection {
+
+ FORWARD,
+ BACKWARD;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java
new file mode 100644
index 0000000..cd11930
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.concurrent.TimeUnit;
+
+public final class StopWatch {
+
+ private long startNanos = -1L;
+ private long duration = -1L;
+
+ /**
+ * Creates a StopWatch but does not start it
+ */
+ public StopWatch() {
+ this(false);
+ }
+
+ /**
+ * @param autoStart whether or not the timer should be started automatically
+ */
+ public StopWatch(final boolean autoStart) {
+ if (autoStart) {
+ start();
+ }
+ }
+
+ public void start() {
+ this.startNanos = System.nanoTime();
+ this.duration = -1L;
+ }
+
+ public void stop() {
+ if (startNanos < 0) {
+ throw new IllegalStateException("StopWatch has not been started");
+ }
+ this.duration = System.nanoTime() - startNanos;
+ this.startNanos = -1L;
+ }
+
+ /**
+ * Returns the amount of time that the StopWatch was running.
+ *
+ * @param timeUnit
+ * @return
+ *
+ * @throws IllegalStateException if the StopWatch has not been stopped via
+ * {@link #stop()}
+ */
+ public long getDuration(final TimeUnit timeUnit) {
+ if (duration < 0) {
+ throw new IllegalStateException("Cannot get duration until StopWatch has been stopped");
+ }
+ return timeUnit.convert(duration, TimeUnit.NANOSECONDS);
+ }
+
+ /**
+ * Returns the amount of time that has elapsed since the timer was started.
+ *
+ * @param timeUnit
+ * @return
+ */
+ public long getElapsed(final TimeUnit timeUnit) {
+ return timeUnit.convert(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS);
+ }
+
+ public String calculateDataRate(final long bytes) {
+ final double seconds = (double) duration / 1000000000.0D;
+ final long dataSize = (long) (bytes / seconds);
+ return FormatUtils.formatDataSize(dataSize) + "/sec";
+ }
+
+ public String getDuration() {
+ final StringBuilder sb = new StringBuilder();
+
+ long duration = this.duration;
+ final long minutes = (duration > 60000000000L) ? (duration / 60000000000L) : 0L;
+ duration -= TimeUnit.NANOSECONDS.convert(minutes, TimeUnit.MINUTES);
+
+ final long seconds = (duration > 1000000000L) ? (duration / 1000000000L) : 0L;
+ duration -= TimeUnit.NANOSECONDS.convert(seconds, TimeUnit.SECONDS);
+
+ final long millis = (duration > 1000000L) ? (duration / 1000000L) : 0L;
+ duration -= TimeUnit.NANOSECONDS.convert(millis, TimeUnit.MILLISECONDS);
+
+ final long nanos = duration % 1000000L;
+
+ if (minutes > 0) {
+ sb.append(minutes).append(" minutes");
+ }
+
+ if (seconds > 0) {
+ if (minutes > 0) {
+ sb.append(", ");
+ }
+
+ sb.append(seconds).append(" seconds");
+ }
+
+ if (millis > 0) {
+ if (seconds > 0) {
+ sb.append(", ");
+ }
+
+ sb.append(millis).append(" millis");
+ }
+ if (seconds == 0 && millis == 0) {
+ sb.append(nanos).append(" nanos");
+ }
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java
new file mode 100644
index 0000000..63736ed
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+/**
+ *
+ * @author unattrib
+ * @param <A>
+ * @param <B>
+ */
+public class Tuple<A, B> {
+
+ final A key;
+ final B value;
+
+ public Tuple(A key, B value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public A getKey() {
+ return key;
+ }
+
+ public B getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other == this) {
+ return true;
+ }
+ if (!(other instanceof Tuple)) {
+ return false;
+ }
+
+ final Tuple<?, ?> tuple = (Tuple<?, ?>) other;
+ if (key == null) {
+ if (tuple.key != null) {
+ return false;
+ }
+ } else {
+ if (!key.equals(tuple.key)) {
+ return false;
+ }
+ }
+
+ if (value == null) {
+ if (tuple.value != null) {
+ return false;
+ }
+ } else {
+ if (!value.equals(tuple.value)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 581 + (this.key == null ? 0 : this.key.hashCode()) + (this.value == null ? 0 : this.value.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java
new file mode 100644
index 0000000..a8d7e82
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.concurrency;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+public class DebugDisabledTimedLock implements DebuggableTimedLock {
+
+ private final Lock lock;
+
+ public DebugDisabledTimedLock(final Lock lock) {
+ this.lock = lock;
+ }
+
+ /**
+ *
+ * @return
+ */
+ @Override
+ public boolean tryLock() {
+ return lock.tryLock();
+ }
+
+ /**
+ *
+ * @param timeout
+ * @param timeUnit
+ * @return
+ */
+ @Override
+ public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
+ try {
+ return lock.tryLock(timeout, timeUnit);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ /**
+ *
+ */
+ @Override
+ public void lock() {
+ lock.lock();
+ }
+
+ @Override
+ public void unlock(final String task) {
+ lock.unlock();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java
new file mode 100644
index 0000000..f082168
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.concurrency;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DebugEnabledTimedLock implements DebuggableTimedLock {
+
+ private final Lock lock;
+ private final Logger logger;
+ private long lockTime = 0L;
+
+ private final Map<String, Long> lockIterations = new HashMap<>();
+ private final Map<String, Long> lockNanos = new HashMap<>();
+
+ private final String name;
+ private final int iterationFrequency;
+
+ public DebugEnabledTimedLock(final Lock lock, final String name, final int iterationFrequency) {
+ this.lock = lock;
+ this.name = name;
+ this.iterationFrequency = iterationFrequency;
+ logger = LoggerFactory.getLogger(TimedLock.class.getName() + "." + name);
+ }
+
+ /**
+ *
+ * @return
+ */
+ @Override
+ public boolean tryLock() {
+ logger.trace("Trying to obtain Lock: {}", name);
+ final boolean success = lock.tryLock();
+ if (!success) {
+ logger.trace("TryLock failed for Lock: {}", name);
+ return false;
+ }
+ logger.trace("TryLock successful");
+
+ return true;
+ }
+
+ /**
+ *
+ * @param timeout
+ * @param timeUnit
+ * @return
+ */
+ @Override
+ public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
+ logger.trace("Trying to obtain Lock {} with a timeout of {} {}", name, timeout, timeUnit);
+ final boolean success;
+ try {
+ success = lock.tryLock(timeout, timeUnit);
+ } catch (final InterruptedException ie) {
+ return false;
+ }
+
+ if (!success) {
+ logger.trace("TryLock failed for Lock {} with a timeout of {} {}", name, timeout, timeUnit);
+ return false;
+ }
+ logger.trace("TryLock successful");
+ return true;
+ }
+
+ /**
+ *
+ */
+ @Override
+ public void lock() {
+ logger.trace("Obtaining Lock {}", name);
+ lock.lock();
+ lockTime = System.nanoTime();
+ logger.trace("Obtained Lock {}", name);
+ }
+
+ /**
+ *
+ * @param task
+ */
+ @Override
+ public void unlock(final String task) {
+ if (lockTime <= 0L) {
+ lock.unlock();
+ return;
+ }
+
+ logger.trace("Releasing Lock {}", name);
+ final long nanosLocked = System.nanoTime() - lockTime;
+
+ Long startIterations = lockIterations.get(task);
+ if (startIterations == null) {
+ startIterations = 0L;
+ }
+ final long iterations = startIterations + 1L;
+ lockIterations.put(task, iterations);
+
+ Long startNanos = lockNanos.get(task);
+ if (startNanos == null) {
+ startNanos = 0L;
+ }
+ final long totalNanos = startNanos + nanosLocked;
+ lockNanos.put(task, totalNanos);
+
+ lockTime = -1L;
+
+ lock.unlock();
+ logger.trace("Released Lock {}", name);
+
+ if (iterations % iterationFrequency == 0) {
+ logger.debug("Lock {} held for {} nanos for task: {}; total lock iterations: {}; total lock nanos: {}", name, nanosLocked, task, iterations, totalNanos);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java
new file mode 100644
index 0000000..69da6e8
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.concurrency;
+
+import java.util.concurrent.TimeUnit;
+
+public interface DebuggableTimedLock {
+
+ void lock();
+
+ boolean tryLock(long timePeriod, TimeUnit timeUnit);
+
+ boolean tryLock();
+
+ void unlock(String task);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java
new file mode 100644
index 0000000..532d3c3
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.concurrency;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TimedLock {
+
+ private final DebugEnabledTimedLock enabled;
+ private final DebugDisabledTimedLock disabled;
+
+ private final Logger logger;
+
+ public TimedLock(final Lock lock, final String name, final int iterationFrequency) {
+ this.enabled = new DebugEnabledTimedLock(lock, name, iterationFrequency);
+ this.disabled = new DebugDisabledTimedLock(lock);
+
+ logger = LoggerFactory.getLogger(TimedLock.class.getName() + "." + name);
+ }
+
+ private DebuggableTimedLock getLock() {
+ return logger.isDebugEnabled() ? enabled : disabled;
+ }
+
+ public boolean tryLock() {
+ return getLock().tryLock();
+ }
+
+ public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
+ return getLock().tryLock(timeout, timeUnit);
+ }
+
+ public void lock() {
+ getLock().lock();
+ }
+
+ public void unlock(final String task) {
+ getLock().unlock(task);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java
new file mode 100644
index 0000000..2b95897
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+public interface EntityAccess<T> {
+
+ T aggregate(T oldValue, T toAdd);
+
+ T createNew();
+
+ long getTimestamp(T entity);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
new file mode 100644
index 0000000..193abc6
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+public class LongEntityAccess implements EntityAccess<TimestampedLong> {
+
+ @Override
+ public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) {
+ if (oldValue == null && toAdd == null) {
+ return new TimestampedLong(0L);
+ } else if (oldValue == null) {
+ return toAdd;
+ } else if (toAdd == null) {
+ return oldValue;
+ }
+
+ return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
+ }
+
+ @Override
+ public TimestampedLong createNew() {
+ return new TimestampedLong(0L);
+ }
+
+ @Override
+ public long getTimestamp(TimestampedLong entity) {
+ return entity == null ? 0L : entity.getTimestamp();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
new file mode 100644
index 0000000..dd8e523
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TimedBuffer<T> {
+
+ private final int numBins;
+ private final EntitySum<T>[] bins;
+ private final EntityAccess<T> entityAccess;
+ private final TimeUnit binPrecision;
+
+ @SuppressWarnings("unchecked")
+ public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess<T> accessor) {
+ this.binPrecision = binPrecision;
+ this.numBins = numBins + 1;
+ this.bins = new EntitySum[this.numBins];
+ for (int i = 0; i < this.numBins; i++) {
+ this.bins[i] = new EntitySum<>(binPrecision, numBins, accessor);
+ }
+ this.entityAccess = accessor;
+ }
+
+ public T add(final T entity) {
+ final int binIdx = (int) (binPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) % numBins);
+ final EntitySum<T> sum = bins[binIdx];
+
+ return sum.addOrReset(entity);
+ }
+
+ public T getAggregateValue(final long sinceEpochMillis) {
+ final int startBinIdx = (int) (binPrecision.convert(sinceEpochMillis, TimeUnit.MILLISECONDS) % numBins);
+
+ T total = null;
+ for (int i = 0; i < numBins; i++) {
+ int binIdx = (startBinIdx + i) % numBins;
+ final EntitySum<T> bin = bins[binIdx];
+
+ if (!bin.isExpired()) {
+ total = entityAccess.aggregate(total, bin.getValue());
+ }
+ }
+
+ return total;
+ }
+
+ private static class EntitySum<S> {
+
+ private final EntityAccess<S> entityAccess;
+ private final AtomicReference<S> ref = new AtomicReference<>();
+ private final TimeUnit binPrecision;
+ private final int numConfiguredBins;
+
+ public EntitySum(final TimeUnit binPrecision, final int numConfiguredBins, final EntityAccess<S> aggregator) {
+ this.binPrecision = binPrecision;
+ this.entityAccess = aggregator;
+ this.numConfiguredBins = numConfiguredBins;
+ }
+
+ private S add(final S event) {
+ S newValue;
+ S value;
+ do {
+ value = ref.get();
+ newValue = entityAccess.aggregate(value, event);
+ } while (!ref.compareAndSet(value, newValue));
+
+ return newValue;
+ }
+
+ public S getValue() {
+ return ref.get();
+ }
+
+ public boolean isExpired() {
+ // entityAccess.getTimestamp(curValue) represents the time at which the current value
+ // was last updated. If the last value is less than current time - 1 binPrecision, then it
+ // means that we've rolled over and need to reset the value.
+ final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(numConfiguredBins, binPrecision);
+
+ final S curValue = ref.get();
+ return (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod);
+ }
+
+ public S addOrReset(final S event) {
+ // entityAccess.getTimestamp(curValue) represents the time at which the current value
+ // was last updated. If the last value is less than current time - 1 binPrecision, then it
+ // means that we've rolled over and need to reset the value.
+ final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(1, binPrecision);
+
+ final S curValue = ref.get();
+ if (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod) {
+ ref.compareAndSet(curValue, entityAccess.createNew());
+ }
+ return add(event);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
new file mode 100644
index 0000000..07d31ea
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+public class TimestampedLong {
+
+ private final Long value;
+ private final long timestamp = System.currentTimeMillis();
+
+ public TimestampedLong(final Long value) {
+ this.value = value;
+ }
+
+ public Long getValue() {
+ return value;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java
new file mode 100644
index 0000000..c796a96
--- /dev/null
+++ b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.util.UUID;
+
+import org.apache.nifi.io.CompoundUpdateMonitor;
+import org.apache.nifi.io.LastModifiedMonitor;
+import org.apache.nifi.io.MD5SumMonitor;
+import org.apache.nifi.io.UpdateMonitor;
+
+import org.junit.Test;
+
+public class TestCompoundUpdateMonitor {
+
+ @Test
+ public void test() throws IOException {
+ final UpdateMonitor lastModified = new LastModifiedMonitor();
+ final MD5SumMonitor md5 = new MD5SumMonitor();
+ final CompoundUpdateMonitor compound = new CompoundUpdateMonitor(lastModified, md5);
+
+ final File file = new File("target/" + UUID.randomUUID().toString());
+ if (file.exists()) {
+ assertTrue(file.delete());
+ }
+ assertTrue(file.createNewFile());
+
+ final Path path = file.toPath();
+
+ final Object curState = compound.getCurrentState(path);
+ final Object state2 = compound.getCurrentState(path);
+
+ assertEquals(curState, state2);
+ file.setLastModified(System.currentTimeMillis() + 1000L);
+ final Object state3 = compound.getCurrentState(path);
+ assertEquals(state2, state3);
+
+ final Object state4 = compound.getCurrentState(path);
+ assertEquals(state3, state4);
+
+ final long lastModifiedDate = file.lastModified();
+ try (final OutputStream out = new FileOutputStream(file)) {
+ out.write("Hello".getBytes("UTF-8"));
+ }
+
+ file.setLastModified(lastModifiedDate);
+
+ final Object state5 = compound.getCurrentState(path);
+ assertNotSame(state4, state5);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java
new file mode 100644
index 0000000..fafffdd
--- /dev/null
+++ b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.util.RingBuffer;
+import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
+import org.apache.nifi.util.RingBuffer.IterationDirection;
+
+import org.junit.Test;
+
+/**
+ *
+ */
+public class TestRingBuffer {
+
+ @Test
+ public void testAsList() {
+ final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10);
+
+ final List<Integer> emptyList = ringBuffer.asList();
+ assertTrue(emptyList.isEmpty());
+
+ for (int i = 0; i < 3; i++) {
+ ringBuffer.add(i);
+ }
+
+ List<Integer> list = ringBuffer.asList();
+ assertEquals(3, list.size());
+ for (int i = 0; i < 3; i++) {
+ assertEquals(Integer.valueOf(i), list.get(i));
+ }
+
+ for (int i = 3; i < 10; i++) {
+ ringBuffer.add(i);
+ }
+
+ list = ringBuffer.asList();
+ assertEquals(10, list.size());
+ for (int i = 0; i < 10; i++) {
+ assertEquals(Integer.valueOf(i), list.get(i));
+ }
+ }
+
+ @Test
+ public void testIterateForwards() {
+ final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10);
+
+ final int[] values = new int[]{3, 5, 20, 7};
+ for (final int v : values) {
+ ringBuffer.add(v);
+ }
+
+ final AtomicInteger countHolder = new AtomicInteger(0);
+ ringBuffer.forEach(new ForEachEvaluator<Integer>() {
+ int counter = 0;
+
+ @Override
+ public boolean evaluate(final Integer value) {
+ final int expected = values[counter++];
+ countHolder.incrementAndGet();
+ assertEquals(expected, value.intValue());
+ return true;
+ }
+
+ }, IterationDirection.FORWARD);
+
+ assertEquals(4, countHolder.get());
+ }
+
+ @Test
+ public void testIterateForwardsAfterFull() {
+ final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10);
+
+ for (int i = 0; i < 12; i++) {
+ ringBuffer.add(i);
+ }
+
+ final int[] values = new int[]{3, 5, 20, 7};
+ for (final int v : values) {
+ ringBuffer.add(v);
+ }
+
+ ringBuffer.forEach(new ForEachEvaluator<Integer>() {
+ int counter = 0;
+
+ @Override
+ public boolean evaluate(final Integer value) {
+ if (counter < 6) {
+ assertEquals(counter + 6, value.intValue());
+ } else {
+ final int expected = values[counter - 6];
+ assertEquals(expected, value.intValue());
+ }
+
+ counter++;
+ return true;
+ }
+
+ }, IterationDirection.FORWARD);
+ }
+
+ @Test
+ public void testIterateBackwards() {
+ final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10);
+
+ final int[] values = new int[]{3, 5, 20, 7};
+ for (final int v : values) {
+ ringBuffer.add(v);
+ }
+
+ final AtomicInteger countHolder = new AtomicInteger(0);
+ ringBuffer.forEach(new ForEachEvaluator<Integer>() {
+ int counter = 0;
+
+ @Override
+ public boolean evaluate(final Integer value) {
+ final int index = values.length - 1 - counter;
+ final int expected = values[index];
+ countHolder.incrementAndGet();
+
+ assertEquals(expected, value.intValue());
+ counter++;
+ return true;
+ }
+
+ }, IterationDirection.BACKWARD);
+
+ assertEquals(4, countHolder.get());
+ }
+
+ @Test
+ public void testIterateBackwardsAfterFull() {
+ final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10);
+
+ for (int i = 0; i < 12; i++) {
+ ringBuffer.add(i);
+ }
+
+ final int[] values = new int[]{3, 5, 20, 7};
+ for (final int v : values) {
+ ringBuffer.add(v);
+ }
+
+ ringBuffer.forEach(new ForEachEvaluator<Integer>() {
+ int counter = 0;
+
+ @Override
+ public boolean evaluate(final Integer value) {
+ if (counter < values.length) {
+ final int index = values.length - 1 - counter;
+ final int expected = values[index];
+
+ assertEquals(expected, value.intValue());
+ counter++;
+ }
+
+ return true;
+ }
+
+ }, IterationDirection.BACKWARD);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java
new file mode 100644
index 0000000..4b2c0d5
--- /dev/null
+++ b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+
+import org.junit.Test;
+
+import org.apache.nifi.io.MD5SumMonitor;
+import org.apache.nifi.io.SynchronousFileWatcher;
+import org.apache.nifi.io.UpdateMonitor;
+
+public class TestSynchronousFileWatcher {
+
+ @Test
+ public void testIt() throws UnsupportedEncodingException, IOException, InterruptedException {
+ final Path path = Paths.get("target/1.txt");
+ Files.copy(new ByteArrayInputStream("Hello, World!".getBytes("UTF-8")), path, StandardCopyOption.REPLACE_EXISTING);
+ final UpdateMonitor monitor = new MD5SumMonitor();
+
+ final SynchronousFileWatcher watcher = new SynchronousFileWatcher(path, monitor, 10L);
+ assertFalse(watcher.checkAndReset());
+ Thread.sleep(30L);
+ assertFalse(watcher.checkAndReset());
+
+ final FileOutputStream fos = new FileOutputStream(path.toFile());
+ try {
+ fos.write("Good-bye, World!".getBytes("UTF-8"));
+ fos.getFD().sync();
+ } finally {
+ fos.close();
+ }
+
+ assertTrue(watcher.checkAndReset());
+ assertFalse(watcher.checkAndReset());
+
+ Thread.sleep(30L);
+ assertFalse(watcher.checkAndReset());
+ }
+}