You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:30:06 UTC
[43/51] [partial] incubator-nifi git commit: Initial code contribution
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-file-utils/src/main/java/org/apache/nifi/file/FileUtils.java
----------------------------------------------------------------------
diff --git a/commons/nifi-file-utils/src/main/java/org/apache/nifi/file/FileUtils.java b/commons/nifi-file-utils/src/main/java/org/apache/nifi/file/FileUtils.java
new file mode 100644
index 0000000..8920493
--- /dev/null
+++ b/commons/nifi-file-utils/src/main/java/org/apache/nifi/file/FileUtils.java
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.file;
+
+import java.io.BufferedInputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.codec.digest.DigestUtils;
+
+import org.slf4j.Logger;
+
+/**
+ * A utility class containing a few useful static methods to do typical IO
+ * operations.
+ *
+ * @author unattributed
+ */
+public class FileUtils {
+
+ public static final long TRANSFER_CHUNK_SIZE_BYTES = 1024 * 1024 * 8; //8 MB chunks
+ public static final long MILLIS_BETWEEN_ATTEMPTS = 50L;
+
+ /**
+ * Closes the given closeable quietly - no logging, no exceptions...
+ *
+ * @param closeable
+ */
+ public static void closeQuietly(final Closeable closeable) {
+ if (null != closeable) {
+ try {
+ closeable.close();
+ } catch (final IOException io) {/*IGNORE*/
+
+ }
+ }
+ }
+
+ /**
+ * Releases the given lock quietly - no logging, no exception
+ *
+ * @param lock
+ */
+ public static void releaseQuietly(final FileLock lock) {
+ if (null != lock) {
+ try {
+ lock.release();
+ } catch (final IOException io) {
+ /*IGNORE*/
+ }
+ }
+ }
+
+ public static void ensureDirectoryExistAndCanAccess(final File dir) throws IOException {
+ if (dir.exists() && !dir.isDirectory()) {
+ throw new IOException(dir.getAbsolutePath() + " is not a directory");
+ } else if (!dir.exists()) {
+ final boolean made = dir.mkdirs();
+ if (!made) {
+ throw new IOException(dir.getAbsolutePath() + " could not be created");
+ }
+ }
+ if (!(dir.canRead() && dir.canWrite())) {
+ throw new IOException(dir.getAbsolutePath() + " directory does not have read/write privilege");
+ }
+ }
+
+ /**
+ * Deletes the given file. If the given file exists but could not be deleted
+ * this will be printed as a warning to the given logger
+ *
+ * @param file
+ * @param logger
+ * @return
+ */
+ public static boolean deleteFile(final File file, final Logger logger) {
+ return FileUtils.deleteFile(file, logger, 1);
+ }
+
+ /**
+ * Deletes the given file. If the given file exists but could not be deleted
+ * this will be printed as a warning to the given logger
+ *
+ * @param file
+ * @param logger
+ * @param attempts indicates how many times an attempt to delete should be
+ * made
+ * @return true if given file no longer exists
+ */
+ public static boolean deleteFile(final File file, final Logger logger, final int attempts) {
+ if(file == null){
+ return false;
+ }
+ boolean isGone = false;
+ try {
+ if (file.exists()) {
+ final int effectiveAttempts = Math.max(1, attempts);
+ for (int i = 0; i < effectiveAttempts && !isGone; i++) {
+ isGone = file.delete() || !file.exists();
+ if (!isGone && (effectiveAttempts - i) > 1) {
+ FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
+ }
+ }
+ if (!isGone && logger != null) {
+ logger.warn("File appears to exist but unable to delete file: " + file.getAbsolutePath());
+ }
+ }
+ } catch (final Throwable t) {
+ if (logger != null) {
+ logger.warn("Unable to delete file: '" + file.getAbsolutePath() + "' due to " + t);
+ }
+ }
+ return isGone;
+ }
+
+ /**
+ * Deletes all of the given files. If any exist and cannot be deleted that
+ * will be printed at warn to the given logger.
+ *
+ * @param files can be null
+ * @param logger can be null
+ */
+ public static void deleteFile(final List<File> files, final Logger logger) {
+ FileUtils.deleteFile(files, logger, 1);
+ }
+
+ /**
+ * Deletes all of the given files. If any exist and cannot be deleted that
+ * will be printed at warn to the given logger.
+ *
+ * @param files can be null
+ * @param logger can be null
+ * @param attempts indicates how many times an attempt should be made to
+ * delete each file
+ */
+ public static void deleteFile(final List<File> files, final Logger logger, final int attempts) {
+ if (null == files || files.isEmpty()) {
+ return;
+ }
+ final int effectiveAttempts = Math.max(1, attempts);
+ for (final File file : files) {
+ try {
+ boolean isGone = false;
+ for (int i = 0; i < effectiveAttempts && !isGone; i++) {
+ isGone = file.delete() || !file.exists();
+ if (!isGone && (effectiveAttempts - i) > 1) {
+ FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
+ }
+ }
+ if (!isGone && logger != null) {
+ logger.warn("File appears to exist but unable to delete file: " + file.getAbsolutePath());
+ }
+ } catch (final Throwable t) {
+ if (null != logger) {
+ logger.warn("Unable to delete file given from path: '" + file.getPath() + "' due to " + t);
+ }
+ }
+ }
+ }
+
+ /**
+ * Deletes all files (not directories..) in the given directory (non
+ * recursive) that match the given filename filter. If any file cannot be
+ * deleted then this is printed at warn to the given logger.
+ *
+ * @param directory
+ * @param filter if null then no filter is used
+ * @param logger
+ */
+ public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger) {
+ FileUtils.deleteFilesInDir(directory, filter, logger, false);
+ }
+
+ /**
+ * Deletes all files (not directories) in the given directory (recursive)
+ * that match the given filename filter. If any file cannot be deleted then
+ * this is printed at warn to the given logger.
+ *
+ * @param directory
+ * @param filter if null then no filter is used
+ * @param logger
+ * @param recurse
+ */
+ public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse) {
+ FileUtils.deleteFilesInDir(directory, filter, logger, recurse, false);
+ }
+
+ /**
+ * Deletes all files (not directories) in the given directory (recursive)
+ * that match the given filename filter. If any file cannot be deleted then
+ * this is printed at warn to the given logger.
+ *
+ * @param directory
+ * @param filter if null then no filter is used
+ * @param logger
+ * @param recurse
+ * @param deleteEmptyDirectories default is false; if true will delete
+ * directories found that are empty
+ */
+ public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse, final boolean deleteEmptyDirectories) {
+ // ensure the specified directory is actually a directory and that it exists
+ if (null != directory && directory.isDirectory()) {
+ final File ingestFiles[] = directory.listFiles();
+ for (File ingestFile : ingestFiles) {
+ boolean process = (filter == null) ? true : filter.accept(directory, ingestFile.getName());
+ if (ingestFile.isFile() && process) {
+ FileUtils.deleteFile(ingestFile, logger, 3);
+ }
+ if (ingestFile.isDirectory() && recurse) {
+ FileUtils.deleteFilesInDir(ingestFile, filter, logger, recurse, deleteEmptyDirectories);
+ if (deleteEmptyDirectories && ingestFile.list().length == 0) {
+ FileUtils.deleteFile(ingestFile, logger, 3);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Deletes given files.
+ *
+ * @param files
+ * @param recurse will recurse
+ * @throws IOException
+ */
+ public static void deleteFiles(final Collection<File> files, final boolean recurse) throws IOException {
+ for (final File file : files) {
+ FileUtils.deleteFile(file, recurse);
+ }
+ }
+
+ public static void deleteFile(final File file, final boolean recurse) throws IOException {
+ if (file.isDirectory() && recurse) {
+ FileUtils.deleteFiles(Arrays.asList(file.listFiles()), recurse);
+ }
+ //now delete the file itself regardless of whether it is plain file or a directory
+ if (!FileUtils.deleteFile(file, null, 5)) {
+ throw new IOException("Unable to delete " + file.getAbsolutePath());
+ }
+ }
+
+ /**
+ * Randomly generates a sequence of bytes and overwrites the contents of the
+ * file a number of times. The file is then deleted.
+ *
+ * @param file File to be overwritten a number of times and, ultimately,
+ * deleted
+ * @param passes Number of times file should be overwritten
+ * @throws IOException if something makes shredding or deleting a problem
+ */
+ public static void shredFile(final File file, final int passes)
+ throws IOException {
+ final Random generator = new Random();
+ final long fileLength = file.length();
+ final int byteArraySize = (int) Math.min(fileLength, 1048576); // 1MB
+ final byte[] b = new byte[byteArraySize];
+ final long numOfRandomWrites = (fileLength / b.length) + 1;
+ final FileOutputStream fos = new FileOutputStream(file);
+ try {
+ // Over write file contents (passes) times
+ final FileChannel channel = fos.getChannel();
+ for (int i = 0; i < passes; i++) {
+ generator.nextBytes(b);
+ for (int j = 0; j <= numOfRandomWrites; j++) {
+ fos.write(b);
+ }
+ fos.flush();
+ channel.position(0);
+ }
+ // Write out "0" for each byte in the file
+ Arrays.fill(b, (byte) 0);
+ for (int j = 0; j < numOfRandomWrites; j++) {
+ fos.write(b);
+ }
+ fos.flush();
+ fos.close();
+ // Try to delete the file a few times
+ if (!FileUtils.deleteFile(file, null, 5)) {
+ throw new IOException("Failed to delete file after shredding");
+ }
+
+ } finally {
+ FileUtils.closeQuietly(fos);
+ }
+ }
+
+ public static long copy(final InputStream in, final OutputStream out) throws IOException {
+ final byte[] buffer = new byte[65536];
+ long copied = 0L;
+ int len;
+ while ((len = in.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ copied += len;
+ }
+
+ return copied;
+ }
+
+ public static long copyBytes(final byte[] bytes, final File destination, final boolean lockOutputFile) throws FileNotFoundException, IOException {
+ FileOutputStream fos = null;
+ FileLock outLock = null;
+ long fileSize = 0L;
+ try {
+ fos = new FileOutputStream(destination);
+ final FileChannel out = fos.getChannel();
+ if (lockOutputFile) {
+ outLock = out.tryLock(0, Long.MAX_VALUE, false);
+ if (null == outLock) {
+ throw new IOException("Unable to obtain exclusive file lock for: " + destination.getAbsolutePath());
+ }
+ }
+ fos.write(bytes);
+ fos.flush();
+ fileSize = bytes.length;
+ } finally {
+ FileUtils.releaseQuietly(outLock);
+ FileUtils.closeQuietly(fos);
+ }
+ return fileSize;
+ }
+
+ /**
+ * Copies the given source file to the given destination file. The given
+ * destination will be overwritten if it already exists.
+ *
+ * @param source
+ * @param destination
+ * @param lockInputFile if true will lock input file during copy; if false
+ * will not
+ * @param lockOutputFile if true will lock output file during copy; if false
+ * will not
+ * @param move if true will perform what is effectively a move operation
+ * rather than a pure copy. This allows for potentially highly efficient
+ * movement of the file but if not possible this will revert to a copy then
+ * delete behavior. If false, then the file is copied and the source file is
+ * retained. If a true rename/move occurs then no lock is held during that
+ * time.
+ * @param logger if failures occur, they will be logged to this logger if
+ * possible. If this logger is null, an IOException will instead be thrown,
+ * indicating the problem.
+ * @return long number of bytes copied
+ * @throws FileNotFoundException if the source file could not be found
+ * @throws IOException
+ * @throws SecurityException if a security manager denies the needed file
+ * operations
+ */
+ public static long copyFile(final File source, final File destination, final boolean lockInputFile, final boolean lockOutputFile, final boolean move, final Logger logger) throws FileNotFoundException, IOException {
+
+ FileInputStream fis = null;
+ FileOutputStream fos = null;
+ FileLock inLock = null;
+ FileLock outLock = null;
+ long fileSize = 0L;
+ if (!source.canRead()) {
+ throw new IOException("Must at least have read permission");
+
+ }
+ if (move && source.renameTo(destination)) {
+ fileSize = destination.length();
+ } else {
+ try {
+ fis = new FileInputStream(source);
+ fos = new FileOutputStream(destination);
+ final FileChannel in = fis.getChannel();
+ final FileChannel out = fos.getChannel();
+ if (lockInputFile) {
+ inLock = in.tryLock(0, Long.MAX_VALUE, true);
+ if (null == inLock) {
+ throw new IOException("Unable to obtain shared file lock for: " + source.getAbsolutePath());
+ }
+ }
+ if (lockOutputFile) {
+ outLock = out.tryLock(0, Long.MAX_VALUE, false);
+ if (null == outLock) {
+ throw new IOException("Unable to obtain exclusive file lock for: " + destination.getAbsolutePath());
+ }
+ }
+ long bytesWritten = 0;
+ do {
+ bytesWritten += out.transferFrom(in, bytesWritten, TRANSFER_CHUNK_SIZE_BYTES);
+ fileSize = in.size();
+ } while (bytesWritten < fileSize);
+ out.force(false);
+ FileUtils.closeQuietly(fos);
+ FileUtils.closeQuietly(fis);
+ fos = null;
+ fis = null;
+ if (move && !FileUtils.deleteFile(source, null, 5)) {
+ if (logger == null) {
+ FileUtils.deleteFile(destination, null, 5);
+ throw new IOException("Could not remove file " + source.getAbsolutePath());
+ } else {
+ logger.warn("Configured to delete source file when renaming/move not successful. However, unable to delete file at: " + source.getAbsolutePath());
+ }
+ }
+ } finally {
+ FileUtils.releaseQuietly(inLock);
+ FileUtils.releaseQuietly(outLock);
+ FileUtils.closeQuietly(fos);
+ FileUtils.closeQuietly(fis);
+ }
+ }
+ return fileSize;
+ }
+
+ /**
+ * Copies the given source file to the given destination file. The given
+ * destination will be overwritten if it already exists.
+ *
+ * @param source
+ * @param destination
+ * @param lockInputFile if true will lock input file during copy; if false
+ * will not
+ * @param lockOutputFile if true will lock output file during copy; if false
+ * will not
+ * @param logger
+ * @return long number of bytes copied
+ * @throws FileNotFoundException if the source file could not be found
+ * @throws IOException
+ * @throws SecurityException if a security manager denies the needed file
+ * operations
+ */
+ public static long copyFile(final File source, final File destination, final boolean lockInputFile, final boolean lockOutputFile, final Logger logger) throws FileNotFoundException, IOException {
+ return FileUtils.copyFile(source, destination, lockInputFile, lockOutputFile, false, logger);
+ }
+
+ public static long copyFile(final File source, final OutputStream stream, final boolean closeOutputStream, final boolean lockInputFile) throws FileNotFoundException, IOException {
+ FileInputStream fis = null;
+ FileLock inLock = null;
+ long fileSize = 0L;
+ try {
+ fis = new FileInputStream(source);
+ final FileChannel in = fis.getChannel();
+ if (lockInputFile) {
+ inLock = in.tryLock(0, Long.MAX_VALUE, true);
+ if (inLock == null) {
+ throw new IOException("Unable to obtain exclusive file lock for: " + source.getAbsolutePath());
+ }
+
+ }
+
+ byte[] buffer = new byte[1 << 18]; //256 KB
+ int bytesRead = -1;
+ while ((bytesRead = fis.read(buffer)) != -1) {
+ stream.write(buffer, 0, bytesRead);
+ }
+ in.force(false);
+ stream.flush();
+ fileSize = in.size();
+ } finally {
+ FileUtils.releaseQuietly(inLock);
+ FileUtils.closeQuietly(fis);
+ if (closeOutputStream) {
+ FileUtils.closeQuietly(stream);
+ }
+ }
+ return fileSize;
+ }
+
+ public static long copyFile(final InputStream stream, final File destination, final boolean closeInputStream, final boolean lockOutputFile) throws FileNotFoundException, IOException {
+ final Path destPath = destination.toPath();
+ final long size = Files.copy(stream, destPath);
+ if (closeInputStream) {
+ stream.close();
+ }
+ return size;
+ }
+
+ /**
+ * Renames the given file from the source path to the destination path. This
+ * handles multiple attempts. This should only be used to rename within a
+ * given directory. Renaming across directories might not work well. See the
+ * <code>File.renameTo</code> for more information.
+ *
+ * @param source the file to rename
+ * @param destination the file path to rename to
+ * @param maxAttempts the max number of attempts to attempt the rename
+ * @throws IOException if rename isn't successful
+ */
+ public static void renameFile(final File source, final File destination, final int maxAttempts) throws IOException {
+ FileUtils.renameFile(source, destination, maxAttempts, false);
+ }
+
+ /**
+ * Renames the given file from the source path to the destination path. This
+ * handles multiple attempts. This should only be used to rename within a
+ * given directory. Renaming across directories might not work well. See the
+ * <code>File.renameTo</code> for more information.
+ *
+ * @param source the file to rename
+ * @param destination the file path to rename to
+ * @param maxAttempts the max number of attempts to attempt the rename
+ * @param replace if true and a rename attempt fails will check if a file is
+ * already at the destination path. If so it will delete that file and
+ * attempt the rename according the remaining maxAttempts. If false, any
+ * conflicting files will be left as they were and the rename attempts will
+ * fail if conflicting.
+ * @throws IOException if rename isn't successful
+ */
+ public static void renameFile(final File source, final File destination, final int maxAttempts, final boolean replace) throws IOException {
+ final int attempts = (replace || maxAttempts < 1) ? Math.max(2, maxAttempts) : maxAttempts;
+ boolean renamed = false;
+ for (int i = 0; i < attempts; i++) {
+ renamed = source.renameTo(destination);
+ if (!renamed) {
+ FileUtils.deleteFile(destination, null, 5);
+ } else {
+ break; //rename has succeeded
+ }
+ }
+ if (!renamed) {
+ throw new IOException("Attempted " + maxAttempts + " times but unable to rename from \'" + source.getPath() + "\' to \'" + destination.getPath() + "\'");
+
+ }
+ }
+
+ public static void sleepQuietly(final long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (final InterruptedException ex) {
+ /* do nothing */
+ }
+ }
+
+ /**
+ * Syncs a primary copy of a file with the copy in the restore directory. If
+ * the restore directory does not have a file and the primary has a file,
+ * the the primary's file is copied to the restore directory. Else if the
+ * restore directory has a file, but the primary does not, then the
+ * restore's file is copied to the primary directory. Else if the primary
+ * file is different than the restore file, then an IllegalStateException is
+ * thrown. Otherwise, if neither file exists, then no syncing is performed.
+ *
+ * @param primaryFile the primary file
+ * @param restoreFile the restore file
+ * @param logger a logger
+ * @throws IOException if an I/O problem was encountered during syncing
+ * @throws IllegalStateException if the primary and restore copies exist but
+ * are different
+ */
+ public static void syncWithRestore(final File primaryFile, final File restoreFile, final Logger logger)
+ throws IOException {
+
+ if (primaryFile.exists() && !restoreFile.exists()) {
+ // copy primary file to restore
+ copyFile(primaryFile, restoreFile, false, false, logger);
+ } else if (restoreFile.exists() && !primaryFile.exists()) {
+ // copy restore file to primary
+ copyFile(restoreFile, primaryFile, false, false, logger);
+ } else if (primaryFile.exists() && restoreFile.exists() && !isSame(primaryFile, restoreFile)) {
+ throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'",
+ primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath()));
+ }
+ }
+
+ /**
+ * Returns true if the given files are the same according to their MD5 hash.
+ *
+ * @param file1 a file
+ * @param file2 a file
+ * @return true if the files are the same; false otherwise
+ * @throws IOException if the MD5 hash could not be computed
+ */
+ public static boolean isSame(final File file1, final File file2) throws IOException {
+ return Arrays.equals(computeMd5Digest(file1), computeMd5Digest(file2));
+ }
+
+ /**
+ * Returns the MD5 hash of the given file.
+ *
+ * @param file a file
+ * @return the MD5 hash
+ * @throws IOException if the MD5 hash could not be computed
+ */
+ public static byte[] computeMd5Digest(final File file) throws IOException {
+ BufferedInputStream bis = null;
+ try {
+ bis = new BufferedInputStream(new FileInputStream(file));
+ return DigestUtils.md5(bis);
+ } finally {
+ FileUtils.closeQuietly(bis);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-logging-utils/pom.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-logging-utils/pom.xml b/commons/nifi-logging-utils/pom.xml
new file mode 100644
index 0000000..ce5064b
--- /dev/null
+++ b/commons/nifi-logging-utils/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-logging-utils</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>NiFi Logging Utils</name>
+ <description>Utilities for logging</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-logging-utils/src/main/java/org/apache/nifi/logging/NiFiLog.java
----------------------------------------------------------------------
diff --git a/commons/nifi-logging-utils/src/main/java/org/apache/nifi/logging/NiFiLog.java b/commons/nifi-logging-utils/src/main/java/org/apache/nifi/logging/NiFiLog.java
new file mode 100644
index 0000000..7c71d85
--- /dev/null
+++ b/commons/nifi-logging-utils/src/main/java/org/apache/nifi/logging/NiFiLog.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+import org.slf4j.Logger;
+import org.slf4j.Marker;
+
+/**
+ *
+ * @author unattributed
+ */
+public class NiFiLog implements Logger {
+
+ private final Logger logger;
+
+ public NiFiLog(final Logger logger) {
+ this.logger = logger;
+ }
+
+ public Logger getWrappedLog() {
+ return logger;
+ }
+
+ @Override
+ public void warn(Marker marker, String string, Throwable thrwbl) {
+ if (logger.isDebugEnabled()) {
+ logger.warn(marker, string, thrwbl);
+ } else {
+ logger.warn(marker, string);
+ }
+ }
+
+ @Override
+ public void warn(Marker marker, String string, Object[] os) {
+ logger.warn(marker, string, os);
+ }
+
+ @Override
+ public void warn(Marker marker, String string, Object o, Object o1) {
+ logger.warn(marker, string, o, o1);
+ }
+
+ @Override
+ public void warn(Marker marker, String string, Object o) {
+ logger.warn(marker, string, o);
+ }
+
+ @Override
+ public void warn(Marker marker, String string) {
+ logger.warn(marker, string);
+ }
+
+ @Override
+ public void warn(String string, Throwable thrwbl) {
+ if (logger.isDebugEnabled()) {
+ logger.warn(string, thrwbl);
+ } else {
+ logger.warn(string);
+ }
+ }
+
+ @Override
+ public void warn(String string, Object o, Object o1) {
+ logger.warn(string, o, o1);
+ }
+
+ @Override
+ public void warn(String string, Object[] os) {
+ logger.warn(string, os);
+ }
+
+ @Override
+ public void warn(String string, Object o) {
+ logger.warn(string, o);
+ }
+
+ @Override
+ public void warn(String string) {
+ logger.warn(string);
+ }
+
+ @Override
+ public void trace(Marker marker, String string, Throwable thrwbl) {
+ logger.trace(marker, string, thrwbl);
+ }
+
+ @Override
+ public void trace(Marker marker, String string, Object[] os) {
+ logger.trace(marker, string, os);
+ }
+
+ @Override
+ public void trace(Marker marker, String string, Object o, Object o1) {
+ logger.trace(marker, string, o, o1);
+ }
+
+ @Override
+ public void trace(Marker marker, String string, Object o) {
+ logger.trace(marker, string, o);
+ }
+
+ @Override
+ public void trace(Marker marker, String string) {
+ logger.trace(marker, string);
+ }
+
+ @Override
+ public void trace(String string, Throwable thrwbl) {
+ logger.trace(string, thrwbl);
+ }
+
+ @Override
+ public void trace(String string, Object[] os) {
+ logger.trace(string, os);
+ }
+
+ @Override
+ public void trace(String string, Object o, Object o1) {
+ logger.trace(string, o, o1);
+ }
+
+ @Override
+ public void trace(String string, Object o) {
+ logger.trace(string, o);
+ }
+
+ @Override
+ public void trace(String string) {
+ logger.trace(string);
+ }
+
+ @Override
+ public boolean isWarnEnabled(Marker marker) {
+ return logger.isWarnEnabled(marker);
+ }
+
+ @Override
+ public boolean isWarnEnabled() {
+ return logger.isWarnEnabled();
+ }
+
+ @Override
+ public boolean isTraceEnabled(Marker marker) {
+ return logger.isTraceEnabled(marker);
+ }
+
+ @Override
+ public boolean isTraceEnabled() {
+ return logger.isTraceEnabled();
+ }
+
+ @Override
+ public boolean isInfoEnabled(Marker marker) {
+ return logger.isInfoEnabled(marker);
+ }
+
+ @Override
+ public boolean isInfoEnabled() {
+ return logger.isInfoEnabled();
+ }
+
+ @Override
+ public boolean isErrorEnabled(Marker marker) {
+ return logger.isErrorEnabled(marker);
+ }
+
+ @Override
+ public boolean isErrorEnabled() {
+ return logger.isErrorEnabled();
+ }
+
+ @Override
+ public boolean isDebugEnabled(Marker marker) {
+ return logger.isDebugEnabled(marker);
+ }
+
+ @Override
+ public boolean isDebugEnabled() {
+ return logger.isDebugEnabled();
+ }
+
+ @Override
+ public void info(Marker marker, String string, Throwable thrwbl) {
+ if (logger.isDebugEnabled()) {
+ logger.info(marker, string, thrwbl);
+ } else {
+ logger.info(marker, string);
+ }
+ }
+
+ @Override
+ public void info(Marker marker, String string, Object[] os) {
+ logger.info(marker, string, os);
+ }
+
+ @Override
+ public void info(Marker marker, String string, Object o, Object o1) {
+ logger.info(marker, string, o, o1);
+ }
+
+ @Override
+ public void info(Marker marker, String string, Object o) {
+ logger.info(marker, string, o);
+ }
+
+ @Override
+ public void info(Marker marker, String string) {
+ logger.info(marker, string);
+ }
+
+ @Override
+ public void info(String string, Throwable thrwbl) {
+ if (logger.isDebugEnabled()) {
+ logger.info(string, thrwbl);
+ } else {
+ logger.info(string);
+ }
+ }
+
+ @Override
+ public void info(String string, Object[] os) {
+ logger.info(string, os);
+ }
+
+ @Override
+ public void info(String string, Object o, Object o1) {
+ logger.info(string, o, o1);
+ }
+
+ @Override
+ public void info(String string, Object o) {
+ logger.info(string, o);
+ }
+
+ @Override
+ public void info(String string) {
+ logger.info(string);
+ }
+
+ @Override
+ public String getName() {
+ return logger.getName();
+ }
+
+ @Override
+ public void error(Marker marker, String string, Throwable thrwbl) {
+ if (logger.isDebugEnabled()) {
+ logger.error(marker, string, thrwbl);
+ } else {
+ logger.error(marker, string);
+ }
+ }
+
+ @Override
+ public void error(Marker marker, String string, Object[] os) {
+ logger.error(marker, string, os);
+ }
+
+ @Override
+ public void error(Marker marker, String string, Object o, Object o1) {
+ logger.error(marker, string, o, o1);
+ }
+
+ @Override
+ public void error(Marker marker, String string, Object o) {
+ logger.error(marker, string, o);
+ }
+
+ @Override
+ public void error(Marker marker, String string) {
+ logger.error(marker, string);
+ }
+
+ @Override
+ public void error(String string, Throwable thrwbl) {
+ if (logger.isDebugEnabled()) {
+ logger.error(string, thrwbl);
+ } else {
+ logger.error(string);
+ }
+ }
+
+ @Override
+ public void error(String string, Object[] os) {
+ logger.error(string, os);
+ }
+
+ @Override
+ public void error(String string, Object o, Object o1) {
+ logger.error(string, o, o1);
+ }
+
+ @Override
+ public void error(String string, Object o) {
+ logger.error(string, o);
+ }
+
+ @Override
+ public void error(String string) {
+ logger.error(string);
+ }
+
+ @Override
+ public void debug(Marker marker, String string, Throwable thrwbl) {
+ logger.debug(marker, string, thrwbl);
+ }
+
+ @Override
+ public void debug(Marker marker, String string, Object[] os) {
+ logger.debug(marker, string, os);
+ }
+
+ @Override
+ public void debug(Marker marker, String string, Object o, Object o1) {
+ logger.debug(marker, string, o, o1);
+ }
+
+ @Override
+ public void debug(Marker marker, String string, Object o) {
+ logger.debug(marker, string, o);
+ }
+
+ @Override
+ public void debug(Marker marker, String string) {
+ logger.debug(marker, string);
+ }
+
+ @Override
+ public void debug(String string, Throwable thrwbl) {
+ logger.debug(string, thrwbl);
+ }
+
+ @Override
+ public void debug(String string, Object[] os) {
+ logger.debug(string, os);
+ }
+
+ @Override
+ public void debug(String string, Object o, Object o1) {
+ logger.debug(string, o, o1);
+ }
+
+ @Override
+ public void debug(String string, Object o) {
+ logger.debug(string, o);
+ }
+
+ @Override
+ public void debug(String string) {
+ logger.debug(string);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-parent/pom.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-parent/pom.xml b/commons/nifi-parent/pom.xml
new file mode 100644
index 0000000..7684d53
--- /dev/null
+++ b/commons/nifi-parent/pom.xml
@@ -0,0 +1,217 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <packaging>pom</packaging>
+ <name>NiFi Parent</name>
+
+ <description>A helpful parent pom which can be used for all NiFi components. Helps establish the basic requirements/depdencies.</description>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <org.slf4j.version>1.7.7</org.slf4j.version>
+ </properties>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <effort>Max</effort>
+ <threshold>Medium</threshold>
+ <xmlOutput>true</xmlOutput>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nar-maven-plugin</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <extensions>true</extensions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
+ </configuration>
+ <version>3.2</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.5</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-war-plugin</artifactId>
+ <version>2.5</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.9</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.18</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.5.2</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-release-plugin</artifactId>
+ <version>2.5.1</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>jaxb2-maven-plugin</artifactId>
+ <version>1.6</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.4</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.3.2</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.4</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.1</version>
+ <configuration>
+ <failOnError>false</failOnError>
+ <quiet>true</quiet>
+ <show>private</show>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.1.2</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <version>${org.slf4j.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <version>${org.slf4j.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${org.slf4j.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+ <dependencies>
+ <!-- required for libraries using commons-loggings -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>1.10.8</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>${org.slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <!-- All projects use the same distrubution Manager for publishing artifacts
+ but for obtaining them this is specified in the settings.xml file for each
+ user -->
+ <distributionManagement>
+ <repository>
+ <id>nifi-releases</id>
+ <url>${nifi.repo.url}</url>
+ </repository>
+ <snapshotRepository>
+ <id>nifi-snapshots</id>
+ <url>${nifi.snapshot.repo.url}</url>
+ </snapshotRepository>
+ </distributionManagement>
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <effort>Max</effort>
+ <threshold>Medium</threshold>
+ <xmlOutput>true</xmlOutput>
+ </configuration>
+ </plugin>
+ </plugins>
+ </reporting>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-properties/.gitignore
----------------------------------------------------------------------
diff --git a/commons/nifi-properties/.gitignore b/commons/nifi-properties/.gitignore
new file mode 100755
index 0000000..073c9fa
--- /dev/null
+++ b/commons/nifi-properties/.gitignore
@@ -0,0 +1,3 @@
+/target
+/target
+/target
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-properties/pom.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-properties/pom.xml b/commons/nifi-properties/pom.xml
new file mode 100644
index 0000000..70f90aa
--- /dev/null
+++ b/commons/nifi-properties/pom.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <artifactId>nifi-properties</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>NiFi Properties</name>
+
+ <dependencies>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git a/commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
new file mode 100644
index 0000000..1520814
--- /dev/null
+++ b/commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -0,0 +1,882 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.InvalidPathException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class NiFiProperties extends Properties {
+
+ private static final long serialVersionUID = 2119177359005492702L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(NiFiProperties.class);
+ private static NiFiProperties instance = null;
+
+ // core properties
+ public static final String PROPERTIES_FILE_PATH = "nifi.properties.file.path";
+ public static final String FLOW_CONFIGURATION_FILE = "nifi.flow.configuration.file";
+ public static final String FLOW_CONFIGURATION_ARCHIVE_FILE = "nifi.flow.configuration.archive.file";
+ public static final String TASK_CONFIGURATION_FILE = "nifi.reporting.task.configuration.file";
+ public static final String SERVICE_CONFIGURATION_FILE = "nifi.controller.service.configuration.file";
+ public static final String AUTHORITY_PROVIDER_CONFIGURATION_FILE = "nifi.authority.provider.configuration.file";
+ public static final String REPOSITORY_DATABASE_DIRECTORY = "nifi.database.directory";
+ public static final String RESTORE_DIRECTORY = "nifi.restore.directory";
+ public static final String VERSION = "nifi.version";
+ public static final String WRITE_DELAY_INTERVAL = "nifi.flowservice.writedelay.interval";
+ public static final String AUTO_RESUME_STATE = "nifi.flowcontroller.autoResumeState";
+ public static final String FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.period";
+ public static final String NAR_LIBRARY_DIRECTORY = "nifi.nar.library.directory";
+ public static final String NAR_WORKING_DIRECTORY = "nifi.nar.working.directory";
+ public static final String COMPONENT_DOCS_DIRECTORY = "nifi.documentation.working.directory";
+ public static final String SENSITIVE_PROPS_KEY = "nifi.sensitive.props.key";
+ public static final String SENSITIVE_PROPS_ALGORITHM = "nifi.sensitive.props.algorithm";
+ public static final String SENSITIVE_PROPS_PROVIDER = "nifi.sensitive.props.provider";
+ public static final String H2_URL_APPEND = "nifi.h2.url.append";
+ public static final String REMOTE_INPUT_PORT = "nifi.remote.input.socket.port";
+ public static final String SITE_TO_SITE_SECURE = "nifi.remote.input.secure";
+ public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory";
+ public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration";
+ public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory";
+
+ // content repository properties
+ public static final String REPOSITORY_CONTENT_PREFIX = "nifi.content.repository.directory.";
+ public static final String CONTENT_REPOSITORY_IMPLEMENTATION = "nifi.content.repository.implementation";
+ public static final String MAX_APPENDABLE_CLAIM_SIZE = "nifi.content.claim.max.appendable.size";
+ public static final String MAX_FLOWFILES_PER_CLAIM = "nifi.content.claim.max.flow.files";
+ public static final String CONTENT_ARCHIVE_MAX_RETENTION_PERIOD = "nifi.content.repository.archive.max.retention.period";
+ public static final String CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE = "nifi.content.repository.archive.max.usage.percentage";
+ public static final String CONTENT_ARCHIVE_BACK_PRESSURE_PERCENTAGE = "nifi.content.repository.archive.backpressure.percentage";
+ public static final String CONTENT_ARCHIVE_ENABLED = "nifi.content.repository.archive.enabled";
+ public static final String CONTENT_ARCHIVE_CLEANUP_FREQUENCY = "nifi.content.repository.archive.cleanup.frequency";
+ public static final String CONTENT_VIEWER_URL = "nifi.content.viewer.url";
+
+ // flowfile repository properties
+ public static final String FLOWFILE_REPOSITORY_IMPLEMENTATION = "nifi.flowfile.repository.implementation";
+ public static final String FLOWFILE_REPOSITORY_ALWAYS_SYNC = "nifi.flowfile.repository.always.sync";
+ public static final String FLOWFILE_REPOSITORY_DIRECTORY = "nifi.flowfile.repository.directory";
+ public static final String FLOWFILE_REPOSITORY_PARTITIONS = "nifi.flowfile.repository.partitions";
+ public static final String FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL = "nifi.flowfile.repository.checkpoint.interval";
+ public static final String FLOWFILE_SWAP_MANAGER_IMPLEMENTATION = "nifi.swap.manager.implementation";
+ public static final String QUEUE_SWAP_THRESHOLD = "nifi.queue.swap.threshold";
+ public static final String SWAP_STORAGE_LOCATION = "nifi.swap.storage.directory";
+ public static final String SWAP_IN_THREADS = "nifi.swap.in.threads";
+ public static final String SWAP_IN_PERIOD = "nifi.swap.in.period";
+ public static final String SWAP_OUT_THREADS = "nifi.swap.out.threads";
+ public static final String SWAP_OUT_PERIOD = "nifi.swap.out.period";
+
+ // provenance properties
+ public static final String PROVENANCE_REPO_IMPLEMENTATION_CLASS = "nifi.provenance.repository.implementation";
+ public static final String PROVENANCE_REPO_DIRECTORY_PREFIX = "nifi.provenance.repository.directory.";
+ public static final String PROVENANCE_MAX_STORAGE_TIME = "nifi.provenance.repository.max.storage.time";
+ public static final String PROVENANCE_MAX_STORAGE_SIZE = "nifi.provenance.repository.max.storage.size";
+ public static final String PROVENANCE_ROLLOVER_TIME = "nifi.provenance.repository.rollover.time";
+ public static final String PROVENANCE_ROLLOVER_SIZE = "nifi.provenance.repository.rollover.size";
+ public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = "nifi.provenance.repository.query.threads";
+ public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = "nifi.provenance.repository.compress.on.rollover";
+ public static final String PROVENANCE_INDEXED_FIELDS = "nifi.provenance.repository.indexed.fields";
+ public static final String PROVENANCE_INDEXED_ATTRIBUTES = "nifi.provenance.repository.indexed.attributes";
+ public static final String PROVENANCE_INDEX_SHARD_SIZE = "nifi.provenance.repository.index.shard.size";
+ public static final String PROVENANCE_JOURNAL_COUNT = "nifi.provenance.repository.journal.count";
+
+ // component status repository properties
+ public static final String COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION = "nifi.components.status.repository.implementation";
+ public static final String COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "nifi.components.status.snapshot.frequency";
+
+ // encryptor properties
+ public static final String NF_SENSITIVE_PROPS_KEY = "nifi.sensitive.props.key";
+ public static final String NF_SENSITIVE_PROPS_ALGORITHM = "nifi.sensitive.props.algorithm";
+ public static final String NF_SENSITIVE_PROPS_PROVIDER = "nifi.sensitive.props.provider";
+
+ // security properties
+ public static final String SECURITY_KEYSTORE = "nifi.security.keystore";
+ public static final String SECURITY_KEYSTORE_TYPE = "nifi.security.keystoreType";
+ public static final String SECURITY_KEYSTORE_PASSWD = "nifi.security.keystorePasswd";
+ public static final String SECURITY_KEY_PASSWD = "nifi.security.keyPasswd";
+ public static final String SECURITY_TRUSTSTORE = "nifi.security.truststore";
+ public static final String SECURITY_TRUSTSTORE_TYPE = "nifi.security.truststoreType";
+ public static final String SECURITY_TRUSTSTORE_PASSWD = "nifi.security.truststorePasswd";
+ public static final String SECURITY_NEED_CLIENT_AUTH = "nifi.security.needClientAuth";
+ public static final String SECURITY_USER_AUTHORITY_PROVIDER = "nifi.security.user.authority.provider";
+ public static final String SECURITY_CLUSTER_AUTHORITY_PROVIDER_PORT = "nifi.security.cluster.authority.provider.port";
+ public static final String SECURITY_CLUSTER_AUTHORITY_PROVIDER_THREADS = "nifi.security.cluster.authority.provider.threads";
+ public static final String SECURITY_USER_CREDENTIAL_CACHE_DURATION = "nifi.security.user.credential.cache.duration";
+ public static final String SECURITY_SUPPORT_NEW_ACCOUNT_REQUESTS = "nifi.security.support.new.account.requests";
+ public static final String SECURITY_DEFAULT_USER_ROLES = "nifi.security.default.user.roles";
+ public static final String SECURITY_OCSP_RESPONDER_URL = "nifi.security.ocsp.responder.url";
+ public static final String SECURITY_OCSP_RESPONDER_CERTIFICATE = "nifi.security.ocsp.responder.certificate";
+
+ // web properties
+ public static final String WEB_WAR_DIR = "nifi.web.war.directory";
+ public static final String WEB_HTTP_PORT = "nifi.web.http.port";
+ public static final String WEB_HTTP_HOST = "nifi.web.http.host";
+ public static final String WEB_HTTPS_PORT = "nifi.web.https.port";
+ public static final String WEB_HTTPS_HOST = "nifi.web.https.host";
+ public static final String WEB_WORKING_DIR = "nifi.web.jetty.working.directory";
+
+ // ui properties
+ public static final String UI_BANNER_TEXT = "nifi.ui.banner.text";
+ public static final String UI_AUTO_REFRESH_INTERVAL = "nifi.ui.autorefresh.interval";
+
+ // cluster common properties
+ public static final String CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "nifi.cluster.protocol.heartbeat.interval";
+ public static final String CLUSTER_PROTOCOL_IS_SECURE = "nifi.cluster.protocol.is.secure";
+ public static final String CLUSTER_PROTOCOL_SOCKET_TIMEOUT = "nifi.cluster.protocol.socket.timeout";
+ public static final String CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT = "nifi.cluster.protocol.connection.handshake.timeout";
+ public static final String CLUSTER_PROTOCOL_USE_MULTICAST = "nifi.cluster.protocol.use.multicast";
+ public static final String CLUSTER_PROTOCOL_MULTICAST_ADDRESS = "nifi.cluster.protocol.multicast.address";
+ public static final String CLUSTER_PROTOCOL_MULTICAST_PORT = "nifi.cluster.protocol.multicast.port";
+ public static final String CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY = "nifi.cluster.protocol.multicast.service.broadcast.delay";
+ public static final String CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS = "nifi.cluster.protocol.multicast.service.locator.attempts";
+ public static final String CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "nifi.cluster.protocol.multicast.service.locator.attempts.delay";
+
+ // cluster node properties
+ public static final String CLUSTER_IS_NODE = "nifi.cluster.is.node";
+ public static final String CLUSTER_NODE_ADDRESS = "nifi.cluster.node.address";
+ public static final String CLUSTER_NODE_PROTOCOL_PORT = "nifi.cluster.node.protocol.port";
+ public static final String CLUSTER_NODE_PROTOCOL_THREADS = "nifi.cluster.node.protocol.threads";
+ public static final String CLUSTER_NODE_UNICAST_MANAGER_ADDRESS = "nifi.cluster.node.unicast.manager.address";
+ public static final String CLUSTER_NODE_UNICAST_MANAGER_PROTOCOL_PORT = "nifi.cluster.node.unicast.manager.protocol.port";
+
+ // cluster manager properties
+ public static final String CLUSTER_IS_MANAGER = "nifi.cluster.is.manager";
+ public static final String CLUSTER_MANAGER_ADDRESS = "nifi.cluster.manager.address";
+ public static final String CLUSTER_MANAGER_PROTOCOL_PORT = "nifi.cluster.manager.protocol.port";
+ public static final String CLUSTER_MANAGER_NODE_FIREWALL_FILE = "nifi.cluster.manager.node.firewall.file";
+ public static final String CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE = "nifi.cluster.manager.node.event.history.size";
+ public static final String CLUSTER_MANAGER_NODE_API_CONNECTION_TIMEOUT = "nifi.cluster.manager.node.api.connection.timeout";
+ public static final String CLUSTER_MANAGER_NODE_API_READ_TIMEOUT = "nifi.cluster.manager.node.api.read.timeout";
+ public static final String CLUSTER_MANAGER_NODE_API_REQUEST_THREADS = "nifi.cluster.manager.node.api.request.threads";
+ public static final String CLUSTER_MANAGER_FLOW_RETRIEVAL_DELAY = "nifi.cluster.manager.flow.retrieval.delay";
+ public static final String CLUSTER_MANAGER_PROTOCOL_THREADS = "nifi.cluster.manager.protocol.threads";
+ public static final String CLUSTER_MANAGER_SAFEMODE_DURATION = "nifi.cluster.manager.safemode.duration";
+
+ // defaults
+ public static final String DEFAULT_TITLE = "NiFi";
+ public static final Boolean DEFAULT_AUTO_RESUME_STATE = true;
+ public static final String DEFAULT_AUTHORITY_PROVIDER_CONFIGURATION_FILE = "conf/authority-providers.xml";
+ public static final String DEFAULT_USER_CREDENTIAL_CACHE_DURATION = "24 hours";
+ public static final Integer DEFAULT_REMOTE_INPUT_PORT = null;
+ public static final Path DEFAULT_TEMPLATE_DIRECTORY = Paths.get("conf", "templates");
+ public static final String DEFAULT_WEB_WORKING_DIR = "./work/jetty";
+ public static final String DEFAULT_NAR_WORKING_DIR = "./work/nar";
+ public static final String DEFAULT_COMPONENT_DOCS_DIRECTORY = "./work/docs/components";
+ public static final String DEFAULT_NAR_LIBRARY_DIR = "./lib";
+ public static final String DEFAULT_FLOWFILE_REPO_PARTITIONS = "256";
+ public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "2 min";
+ public static final int DEFAULT_MAX_FLOWFILES_PER_CLAIM = 100;
+ public static final int DEFAULT_QUEUE_SWAP_THRESHOLD = 20000;
+ public static final String DEFAULT_SWAP_STORAGE_LOCATION = "./flowfile_repository/swap";
+ public static final String DEFAULT_SWAP_IN_PERIOD = "1 sec";
+ public static final String DEFAULT_SWAP_OUT_PERIOD = "5 sec";
+ public static final int DEFAULT_SWAP_IN_THREADS = 4;
+ public static final int DEFAULT_SWAP_OUT_THREADS = 4;
+ public static final String DEFAULT_ADMINISTRATIVE_YIELD_DURATION = "30 sec";
+ public static final String DEFAULT_PERSISTENT_STATE_DIRECTORY = "./conf/state";
+ public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "5 mins";
+
+ // cluster common defaults
+ public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec";
+ public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY = "500 ms";
+ public static final int DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS = 3;
+ public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "1 sec";
+ public static final String DEFAULT_CLUSTER_PROTOCOL_SOCKET_TIMEOUT = "30 sec";
+ public static final String DEFAULT_CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT = "45 sec";
+
+ // cluster node defaults
+ public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 2;
+
+ // cluster manager defaults
+ public static final int DEFAULT_CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE = 10;
+ public static final String DEFAULT_CLUSTER_MANAGER_NODE_API_CONNECTION_TIMEOUT = "30 sec";
+ public static final String DEFAULT_CLUSTER_MANAGER_NODE_API_READ_TIMEOUT = "30 sec";
+ public static final int DEFAULT_CLUSTER_MANAGER_NODE_API_NUM_REQUEST_THREADS = 10;
+ public static final String DEFAULT_CLUSTER_MANAGER_FLOW_RETRIEVAL_DELAY = "5 sec";
+ public static final int DEFAULT_CLUSTER_MANAGER_PROTOCOL_THREADS = 10;
+ public static final String DEFAULT_CLUSTER_MANAGER_SAFEMODE_DURATION = "0 sec";
+
+ private NiFiProperties() {
+ super();
+ }
+
+ /**
+ * This is the method through which the NiFiProperties object should be
+ * obtained.
+ *
+ * @return the NiFiProperties object to use
+ * @throws RuntimeException if unable to load properties file
+ */
+ public static synchronized NiFiProperties getInstance() {
+ if (null == instance) {
+ final NiFiProperties suspectInstance = new NiFiProperties();
+ final String nfPropertiesFilePath = System.getProperty(NiFiProperties.PROPERTIES_FILE_PATH);
+ if (null == nfPropertiesFilePath || nfPropertiesFilePath.trim().length() == 0) {
+ throw new RuntimeException("Requires a system property called \'" + NiFiProperties.PROPERTIES_FILE_PATH + "\' and this is not set or has no value");
+ }
+ final File propertiesFile = new File(nfPropertiesFilePath);
+ if (!propertiesFile.exists()) {
+ throw new RuntimeException("Properties file doesn't exist \'" + propertiesFile.getAbsolutePath() + "\'");
+ }
+ if (!propertiesFile.canRead()) {
+ throw new RuntimeException("Properties file exists but cannot be read \'" + propertiesFile.getAbsolutePath() + "\'");
+ }
+ InputStream inStream = null;
+ try {
+ inStream = new BufferedInputStream(new FileInputStream(propertiesFile));
+ suspectInstance.load(inStream);
+ } catch (final Exception ex) {
+ LOG.error("Cannot load properties file due to " + ex.getLocalizedMessage());
+ throw new RuntimeException("Cannot load properties file due to " + ex.getLocalizedMessage(), ex);
+ } finally {
+ if (null != inStream) {
+ try {
+ inStream.close();
+ } catch (final Exception ex) {
+ /**
+ * do nothing *
+ */
+ }
+ }
+ }
+ instance = suspectInstance;
+ }
+ return instance;
+ }
+
+ // getters for core properties //
+ public File getFlowConfigurationFile() {
+ try {
+ return new File(getProperty(FLOW_CONFIGURATION_FILE));
+ } catch (Exception ex) {
+ return null;
+ }
+ }
+
+ public File getFlowConfigurationFileDir() {
+ try {
+ return getFlowConfigurationFile().getParentFile();
+ } catch (Exception ex) {
+ return null;
+ }
+ }
+
+ private Integer getPropertyAsPort(final String propertyName, final Integer defaultValue) {
+ final String port = getProperty(propertyName);
+ if (StringUtils.isEmpty(port)) {
+ return defaultValue;
+ }
+ try {
+ final int val = Integer.parseInt(port);
+ if (val <= 0 || val > 65535) {
+ throw new RuntimeException("Valid port range is 0 - 65535 but got " + val);
+ }
+ return val;
+ } catch (final NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
+ public int getQueueSwapThreshold() {
+ final String thresholdValue = getProperty(QUEUE_SWAP_THRESHOLD);
+ if (thresholdValue == null) {
+ return DEFAULT_QUEUE_SWAP_THRESHOLD;
+ }
+
+ try {
+ return Integer.parseInt(thresholdValue);
+ } catch (final NumberFormatException e) {
+ return DEFAULT_QUEUE_SWAP_THRESHOLD;
+ }
+ }
+
+ public File getSwapStorageLocation() {
+ final String location = getProperty(SWAP_STORAGE_LOCATION);
+ if (location == null) {
+ return new File(DEFAULT_SWAP_STORAGE_LOCATION);
+ } else {
+ return new File(location);
+ }
+ }
+
+ public Integer getIntegerProperty(final String propertyName, final Integer defaultValue) {
+ final String value = getProperty(propertyName);
+ if (value == null) {
+ return defaultValue;
+ }
+
+ try {
+ return Integer.parseInt(getProperty(propertyName));
+ } catch (final Exception e) {
+ return defaultValue;
+ }
+ }
+
+ public int getSwapInThreads() {
+ return getIntegerProperty(SWAP_IN_THREADS, DEFAULT_SWAP_IN_THREADS);
+ }
+
+ public int getSwapOutThreads() {
+ final String value = getProperty(SWAP_OUT_THREADS);
+ if (value == null) {
+ return DEFAULT_SWAP_OUT_THREADS;
+ }
+
+ try {
+ return Integer.parseInt(getProperty(SWAP_OUT_THREADS));
+ } catch (final Exception e) {
+ return DEFAULT_SWAP_OUT_THREADS;
+ }
+ }
+
+ public String getSwapInPeriod() {
+ return getProperty(SWAP_IN_PERIOD, DEFAULT_SWAP_IN_PERIOD);
+ }
+
+ public String getSwapOutPeriod() {
+ return getProperty(SWAP_OUT_PERIOD, DEFAULT_SWAP_OUT_PERIOD);
+ }
+
+ public String getAdministrativeYieldDuration() {
+ return getProperty(ADMINISTRATIVE_YIELD_DURATION, DEFAULT_ADMINISTRATIVE_YIELD_DURATION);
+ }
+
+ /**
+ * The socket port to listen on for a Remote Input Port.
+ *
+ * @return
+ */
+ public Integer getRemoteInputPort() {
+ return getPropertyAsPort(REMOTE_INPUT_PORT, DEFAULT_REMOTE_INPUT_PORT);
+ }
+
+ public Boolean isSiteToSiteSecure() {
+ final String secureVal = getProperty(SITE_TO_SITE_SECURE);
+ if (secureVal == null) {
+ return null;
+ }
+
+ if ("true".equalsIgnoreCase(secureVal)) {
+ return true;
+ }
+ if ("false".equalsIgnoreCase(secureVal)) {
+ return false;
+ }
+
+ throw new IllegalStateException("Property value for " + SITE_TO_SITE_SECURE + " is " + secureVal + "; expected 'true' or 'false'");
+ }
+
+ /**
+ * Returns the directory to which Templates are to be persisted
+ *
+ * @return
+ */
+ public Path getTemplateDirectory() {
+ final String strVal = getProperty(TEMPLATE_DIRECTORY);
+ return (strVal == null) ? DEFAULT_TEMPLATE_DIRECTORY : Paths.get(strVal);
+ }
+
+ /**
+ * Get the flow service write delay.
+ *
+ * @return The write delay
+ */
+ public String getFlowServiceWriteDelay() {
+ return getProperty(WRITE_DELAY_INTERVAL);
+ }
+
+ /**
+ * Returns whether the processors should be started automatically when the
+ * application loads.
+ *
+ * @return Whether to auto start the processors or not
+ */
+ public boolean getAutoResumeState() {
+ final String rawAutoResumeState = getProperty(AUTO_RESUME_STATE, DEFAULT_AUTO_RESUME_STATE.toString());
+ return Boolean.parseBoolean(rawAutoResumeState);
+ }
+
+ /**
+ * Returns the number of partitions that should be used for the FlowFile
+ * Repository
+ *
+ * @return
+ */
+ public int getFlowFileRepositoryPartitions() {
+ final String rawProperty = getProperty(FLOWFILE_REPOSITORY_PARTITIONS, DEFAULT_FLOWFILE_REPO_PARTITIONS);
+ return Integer.parseInt(rawProperty);
+ }
+
+ /**
+ * Returns the number of milliseconds between FlowFileRepository
+ * checkpointing
+ *
+ * @return
+ */
+ public String getFlowFileRepositoryCheckpointInterval() {
+ return getProperty(FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL, DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL);
+ }
+
+ /**
+ * @return the restore directory or null if not configured
+ */
+ public File getRestoreDirectory() {
+ final String value = getProperty(RESTORE_DIRECTORY);
+ if (StringUtils.isBlank(value)) {
+ return null;
+ } else {
+ return new File(value);
+ }
+ }
+
+ /**
+ * @return the user authorities file
+ */
+ public File getAuthorityProviderConfiguraitonFile() {
+ final String value = getProperty(AUTHORITY_PROVIDER_CONFIGURATION_FILE);
+ if (StringUtils.isBlank(value)) {
+ return new File(DEFAULT_AUTHORITY_PROVIDER_CONFIGURATION_FILE);
+ } else {
+ return new File(value);
+ }
+ }
+
+ /**
+ * Will default to true unless the value is explicitly set to false.
+ *
+ * @return Whether client auth is required
+ */
+ public boolean getNeedClientAuth() {
+ boolean needClientAuth = true;
+ String rawNeedClientAuth = getProperty(SECURITY_NEED_CLIENT_AUTH);
+ if ("false".equalsIgnoreCase(rawNeedClientAuth)) {
+ needClientAuth = false;
+ }
+ return needClientAuth;
+ }
+
+ public String getUserCredentialCacheDuration() {
+ return getProperty(SECURITY_USER_CREDENTIAL_CACHE_DURATION, DEFAULT_USER_CREDENTIAL_CACHE_DURATION);
+ }
+
+ public boolean getSupportNewAccountRequests() {
+ boolean shouldSupport = true;
+ String rawShouldSupport = getProperty(SECURITY_SUPPORT_NEW_ACCOUNT_REQUESTS);
+ if ("false".equalsIgnoreCase(rawShouldSupport)) {
+ shouldSupport = false;
+ }
+ return shouldSupport;
+ }
+
+ // getters for web properties //
+ public Integer getPort() {
+ Integer port = null;
+ try {
+ port = Integer.parseInt(getProperty(WEB_HTTP_PORT));
+ } catch (NumberFormatException nfe) {
+ }
+ return port;
+ }
+
+ public Integer getSslPort() {
+ Integer sslPort = null;
+ try {
+ sslPort = Integer.parseInt(getProperty(WEB_HTTPS_PORT));
+ } catch (NumberFormatException nfe) {
+ }
+ return sslPort;
+ }
+
+ public File getWebWorkingDirectory() {
+ return new File(getProperty(WEB_WORKING_DIR, DEFAULT_WEB_WORKING_DIR));
+ }
+
+ public File getComponentDocumentationWorkingDirectory() {
+ return new File(getProperty(COMPONENT_DOCS_DIRECTORY, DEFAULT_COMPONENT_DOCS_DIRECTORY));
+ }
+
+ public File getNarWorkingDirectory() {
+ return new File(getProperty(NAR_WORKING_DIRECTORY, DEFAULT_NAR_WORKING_DIR));
+ }
+
+ public File getFrameworkWorkingDirectory() {
+ return new File(getNarWorkingDirectory(), "framework");
+ }
+
+ public File getExtensionsWorkingDirectory() {
+ return new File(getNarWorkingDirectory(), "extensions");
+ }
+
+ public File getNarLibraryDirectory() {
+ return new File(getProperty(NAR_LIBRARY_DIRECTORY, DEFAULT_NAR_LIBRARY_DIR));
+ }
+
+ // getters for ui properties //
+ /**
+ * Get the title for the UI.
+ *
+ * @return The UI title
+ */
+ public String getUiTitle() {
+ return this.getProperty(VERSION, DEFAULT_TITLE);
+ }
+
+ /**
+ * Get the banner text.
+ *
+ * @return The banner text
+ */
+ public String getBannerText() {
+ return this.getProperty(UI_BANNER_TEXT, StringUtils.EMPTY);
+ }
+
+ /**
+ * Returns the auto refresh interval in seconds.
+ *
+ * @return
+ */
+ public String getAutoRefreshInterval() {
+ return getProperty(UI_AUTO_REFRESH_INTERVAL);
+ }
+
+ // getters for cluster protocol properties //
+ public String getClusterProtocolHeartbeatInterval() {
+ return getProperty(CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL);
+ }
+
+ public String getNodeHeartbeatInterval() {
+ return getClusterProtocolHeartbeatInterval();
+ }
+
+ public String getClusterProtocolSocketTimeout() {
+ return getProperty(CLUSTER_PROTOCOL_SOCKET_TIMEOUT, DEFAULT_CLUSTER_PROTOCOL_SOCKET_TIMEOUT);
+ }
+
+ public String getClusterProtocolConnectionHandshakeTimeout() {
+ return getProperty(CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT, DEFAULT_CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT);
+ }
+
+ public boolean getClusterProtocolUseMulticast() {
+ return Boolean.parseBoolean(getProperty(CLUSTER_PROTOCOL_USE_MULTICAST));
+ }
+
+ public InetSocketAddress getClusterProtocolMulticastAddress() {
+ try {
+ String multicastAddress = getProperty(CLUSTER_PROTOCOL_MULTICAST_ADDRESS);
+ int multicastPort = Integer.parseInt(getProperty(CLUSTER_PROTOCOL_MULTICAST_PORT));
+ return new InetSocketAddress(multicastAddress, multicastPort);
+ } catch (Exception ex) {
+ throw new RuntimeException("Invalid multicast address/port due to: " + ex, ex);
+ }
+ }
+
+ public String getClusterProtocolMulticastServiceBroadcastDelay() {
+ return getProperty(CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY);
+ }
+
+ public File getPersistentStateDirectory() {
+ final String dirName = getProperty(PERSISTENT_STATE_DIRECTORY, DEFAULT_PERSISTENT_STATE_DIRECTORY);
+ final File file = new File(dirName);
+ if (!file.exists()) {
+ file.mkdirs();
+ }
+ return file;
+ }
+
+ public int getClusterProtocolMulticastServiceLocatorAttempts() {
+ try {
+ return Integer.parseInt(getProperty(CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS));
+ } catch (NumberFormatException nfe) {
+ return DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS;
+ }
+ }
+
+ public String getClusterProtocolMulticastServiceLocatorAttemptsDelay() {
+ return getProperty(CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY, DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY);
+ }
+
+ // getters for cluster node properties //
+ public boolean isNode() {
+ return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE));
+ }
+
+ public InetSocketAddress getClusterNodeProtocolAddress() {
+ try {
+ String socketAddress = getProperty(CLUSTER_NODE_ADDRESS);
+ if (StringUtils.isBlank(socketAddress)) {
+ socketAddress = "localhost";
+ }
+ int socketPort = getClusterNodeProtocolPort();
+ return InetSocketAddress.createUnresolved(socketAddress, socketPort);
+ } catch (Exception ex) {
+ throw new RuntimeException("Invalid node protocol address/port due to: " + ex, ex);
+ }
+ }
+
+ public Integer getClusterNodeProtocolPort() {
+ try {
+ return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_PORT));
+ } catch (NumberFormatException nfe) {
+ return null;
+ }
+ }
+
+ public int getClusterNodeProtocolThreads() {
+ try {
+ return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_THREADS));
+ } catch (NumberFormatException nfe) {
+ return DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS;
+ }
+ }
+
+ public InetSocketAddress getClusterNodeUnicastManagerProtocolAddress() {
+ try {
+ String socketAddress = getProperty(CLUSTER_NODE_UNICAST_MANAGER_ADDRESS);
+ if (StringUtils.isBlank(socketAddress)) {
+ socketAddress = "localhost";
+ }
+ int socketPort = Integer.parseInt(getProperty(CLUSTER_NODE_UNICAST_MANAGER_PROTOCOL_PORT));
+ return InetSocketAddress.createUnresolved(socketAddress, socketPort);
+ } catch (Exception ex) {
+ throw new RuntimeException("Invalid unicast manager address/port due to: " + ex, ex);
+ }
+ }
+
+ // getters for cluster manager properties //
+ public boolean isClusterManager() {
+ return Boolean.parseBoolean(getProperty(CLUSTER_IS_MANAGER));
+ }
+
+ public InetSocketAddress getClusterManagerProtocolAddress() {
+ try {
+ String socketAddress = getProperty(CLUSTER_MANAGER_ADDRESS);
+ if (StringUtils.isBlank(socketAddress)) {
+ socketAddress = "localhost";
+ }
+ int socketPort = getClusterManagerProtocolPort();
+ return InetSocketAddress.createUnresolved(socketAddress, socketPort);
+ } catch (Exception ex) {
+ throw new RuntimeException("Invalid manager protocol address/port due to: " + ex, ex);
+ }
+ }
+
+ public Integer getClusterManagerProtocolPort() {
+ try {
+ return Integer.parseInt(getProperty(CLUSTER_MANAGER_PROTOCOL_PORT));
+ } catch (NumberFormatException nfe) {
+ return null;
+ }
+ }
+
+ public File getClusterManagerNodeFirewallFile() {
+ final String firewallFile = getProperty(CLUSTER_MANAGER_NODE_FIREWALL_FILE);
+ if (StringUtils.isBlank(firewallFile)) {
+ return null;
+ } else {
+ return new File(firewallFile);
+ }
+ }
+
+ public int getClusterManagerNodeEventHistorySize() {
+ try {
+ return Integer.parseInt(getProperty(CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE));
+ } catch (NumberFormatException nfe) {
+ return DEFAULT_CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE;
+ }
+ }
+
+ public String getClusterManagerNodeApiConnectionTimeout() {
+ return getProperty(CLUSTER_MANAGER_NODE_API_CONNECTION_TIMEOUT, DEFAULT_CLUSTER_MANAGER_NODE_API_CONNECTION_TIMEOUT);
+ }
+
+ public String getClusterManagerNodeApiReadTimeout() {
+ return getProperty(CLUSTER_MANAGER_NODE_API_READ_TIMEOUT, DEFAULT_CLUSTER_MANAGER_NODE_API_READ_TIMEOUT);
+ }
+
+ public int getClusterManagerNodeApiRequestThreads() {
+ try {
+ return Integer.parseInt(getProperty(CLUSTER_MANAGER_NODE_API_REQUEST_THREADS));
+ } catch (NumberFormatException nfe) {
+ return DEFAULT_CLUSTER_MANAGER_NODE_API_NUM_REQUEST_THREADS;
+ }
+ }
+
+ public String getClusterManagerFlowRetrievalDelay() {
+ return getProperty(CLUSTER_MANAGER_FLOW_RETRIEVAL_DELAY, DEFAULT_CLUSTER_MANAGER_FLOW_RETRIEVAL_DELAY);
+ }
+
+ public int getClusterManagerProtocolThreads() {
+ try {
+ return Integer.parseInt(getProperty(CLUSTER_MANAGER_PROTOCOL_THREADS));
+ } catch (NumberFormatException nfe) {
+ return DEFAULT_CLUSTER_MANAGER_PROTOCOL_THREADS;
+ }
+ }
+
+ public String getClusterManagerSafeModeDuration() {
+ return getProperty(CLUSTER_MANAGER_SAFEMODE_DURATION, DEFAULT_CLUSTER_MANAGER_SAFEMODE_DURATION);
+ }
+
+ public String getClusterProtocolManagerToNodeApiScheme() {
+ final String isSecureProperty = getProperty(CLUSTER_PROTOCOL_IS_SECURE);
+ if (Boolean.valueOf(isSecureProperty)) {
+ return "https";
+ } else {
+ return "http";
+ }
+ }
+
+ public InetSocketAddress getNodeApiAddress() {
+
+ final String rawScheme = getClusterProtocolManagerToNodeApiScheme();
+ final String scheme = (rawScheme == null) ? "http" : rawScheme;
+
+ final String host;
+ final int port;
+ if ("http".equalsIgnoreCase(scheme)) {
+ // get host
+ if (StringUtils.isBlank(getProperty(WEB_HTTP_HOST))) {
+ host = "localhost";
+ } else {
+ host = getProperty(WEB_HTTP_HOST);
+ }
+ // get port
+ port = getPort();
+ } else {
+ // get host
+ if (StringUtils.isBlank(getProperty(WEB_HTTPS_HOST))) {
+ host = "localhost";
+ } else {
+ host = getProperty(WEB_HTTPS_HOST);
+ }
+ // get port
+ port = getSslPort();
+ }
+
+ return InetSocketAddress.createUnresolved(host, port);
+
+ }
+
+ /**
+ * Returns the database repository path. It simply returns the value
+ * configured. No directories will be created as a result of this operation.
+ *
+ * @return database repository path
+ * @throws InvalidPathException If the configured path is invalid
+ */
+ public Path getDatabaseRepositoryPath() {
+ return Paths.get(getProperty(REPOSITORY_DATABASE_DIRECTORY));
+ }
+
+ /**
+ * Returns the flow file repository path. It simply returns the value
+ * configured. No directories will be created as a result of this operation.
+ *
+ * @return database repository path
+ * @throws InvalidPathException If the configured path is invalid
+ */
+ public Path getFlowFileRepositoryPath() {
+ return Paths.get(getProperty(FLOWFILE_REPOSITORY_DIRECTORY));
+ }
+
+ /**
+ * Returns the content repository paths. This method returns a mapping of
+ * file repository name to file repository paths. It simply returns the
+ * values configured. No directories will be created as a result of this
+ * operation.
+ *
+ * @return file repositories paths
+ * @throws InvalidPathException If any of the configured paths are invalid
+ */
+ public Map<String, Path> getContentRepositoryPaths() {
+ final Map<String, Path> contentRepositoryPaths = new HashMap<>();
+
+ // go through each property
+ for (String propertyName : stringPropertyNames()) {
+ // determine if the property is a file repository path
+ if (StringUtils.startsWith(propertyName, REPOSITORY_CONTENT_PREFIX)) {
+ // get the repository key
+ final String key = StringUtils.substringAfter(propertyName, REPOSITORY_CONTENT_PREFIX);
+
+ // attempt to resolve the path specified
+ contentRepositoryPaths.put(key, Paths.get(getProperty(propertyName)));
+ }
+ }
+ return contentRepositoryPaths;
+ }
+
+ /**
+ * Returns the provenance repository paths. This method returns a mapping of
+ * file repository name to file repository paths. It simply returns the
+ * values configured. No directories will be created as a result of this
+ * operation.
+ *
+ * @return
+ */
+ public Map<String, Path> getProvenanceRepositoryPaths() {
+ final Map<String, Path> provenanceRepositoryPaths = new HashMap<>();
+
+ // go through each property
+ for (String propertyName : stringPropertyNames()) {
+ // determine if the property is a file repository path
+ if (StringUtils.startsWith(propertyName, PROVENANCE_REPO_DIRECTORY_PREFIX)) {
+ // get the repository key
+ final String key = StringUtils.substringAfter(propertyName, PROVENANCE_REPO_DIRECTORY_PREFIX);
+
+ // attempt to resolve the path specified
+ provenanceRepositoryPaths.put(key, Paths.get(getProperty(propertyName)));
+ }
+ }
+ return provenanceRepositoryPaths;
+ }
+
+ public int getMaxFlowFilesPerClaim() {
+ try {
+ return Integer.parseInt(getProperty(MAX_FLOWFILES_PER_CLAIM));
+ } catch (NumberFormatException nfe) {
+ return DEFAULT_MAX_FLOWFILES_PER_CLAIM;
+ }
+ }
+
+ public String getMaxAppendableClaimSize() {
+ return getProperty(MAX_APPENDABLE_CLAIM_SIZE);
+ }
+
+ @Override
+ public String getProperty(final String key, final String defaultValue) {
+ final String value = super.getProperty(key, defaultValue);
+ if (value == null) {
+ return null;
+ }
+
+ if (value.trim().isEmpty()) {
+ return defaultValue;
+ }
+ return value;
+ }
+
+}