You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/07/01 19:47:40 UTC
[nifi] branch main updated: NIFI-10166 Improved MiNiFi bootstrap test coverage
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d78c667c19 NIFI-10166 Improved MiNiFi bootstrap test coverage
d78c667c19 is described below
commit d78c667c197cf126e31d291e5f61e41629073d18
Author: Ferenc Erdei <er...@gmail.com>
AuthorDate: Fri Jun 24 16:30:02 2022 +0200
NIFI-10166 Improved MiNiFi bootstrap test coverage
This closes #6160
Signed-off-by: David Handermann <ex...@apache.org>
---
.../apache/nifi/minifi/bootstrap/RunMiNiFi.java | 12 +-
.../bootstrap/command/CommandRunnerFactory.java | 13 +-
.../nifi/minifi/bootstrap/command/DumpRunner.java | 2 +-
.../nifi/minifi/bootstrap/command/EnvRunner.java | 2 +-
.../nifi/minifi/bootstrap/command/StartRunner.java | 19 +-
.../nifi/minifi/bootstrap/command/StopRunner.java | 64 ++++---
.../ingestors/AbstractPullChangeIngestor.java | 11 +-
.../ingestors/FileChangeIngestor.java | 43 ++---
.../ingestors/PullHttpChangeIngestor.java | 87 +++------
.../ingestors/RestChangeIngestor.java | 83 +++++----
.../minifi/bootstrap/service/BootstrapCodec.java | 3 +-
.../bootstrap/service/CurrentPortProvider.java | 8 +-
.../service/GracefulShutdownParameterProvider.java | 4 +-
.../service/MiNiFiConfigurationChangeListener.java | 76 +-------
.../bootstrap/service/MiNiFiStatusProvider.java | 8 +-
.../minifi/bootstrap/service/ReloadService.java | 10 +-
.../minifi/bootstrap/util/ConfigTransformer.java | 67 ++++++-
.../nifi/minifi/bootstrap/util/ProcessUtils.java | 28 +++
.../minifi/bootstrap/util/UnixProcessUtils.java | 37 ++--
.../nifi/minifi/bootstrap/ShutdownHookTest.java | 58 ++++++
.../command/CommandRunnerFactoryTest.java | 155 ++++++++++++++++
.../command/CompositeCommandRunnerTest.java | 73 ++++++++
.../minifi/bootstrap/command/DumpRunnerTest.java | 130 ++++++++++++++
.../minifi/bootstrap/command/EnvRunnerTest.java | 85 +++++++++
.../bootstrap/command/FlowStatusRunnerTest.java | 81 +++++++++
.../minifi/bootstrap/command/StatusRunnerTest.java | 100 +++++++++++
.../minifi/bootstrap/command/StopRunnerTest.java | 197 +++++++++++++++++++++
.../ingestors/FileChangeIngestorTest.java | 32 ++--
.../ingestors/PullHttpChangeIngestorSSLTest.java | 25 ++-
.../ingestors/PullHttpChangeIngestorTest.java | 25 ++-
.../ingestors/RestChangeIngestorSSLTest.java | 10 +-
.../ingestors/RestChangeIngestorTest.java | 12 +-
.../common/PullHttpChangeIngestorCommonTest.java | 48 +++--
.../common/RestChangeIngestorCommonTest.java | 22 ++-
.../bootstrap/service/BootstrapCodecTest.java | 184 +++++++++++++++++++
.../bootstrap/service/CurrentPortProviderTest.java | 92 ++++++++++
.../GracefulShutdownParameterProviderTest.java | 67 +++++++
.../c2/HierarchicalC2IntegrationTest.java | 2 +-
.../standalone/test/StandaloneXmlTest.java | 2 +-
.../src/test/resources/conf/nifi.properties | 125 +++++++++++++
.../src/test/resources/logback.xml | 4 +-
.../repository/StandardProcessSession.java | 4 +-
42 files changed, 1750 insertions(+), 360 deletions(-)
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
index 451e430b41..6958bb2498 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
@@ -42,6 +42,8 @@ import org.apache.nifi.minifi.bootstrap.service.MiNiFiStatusProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiStdLogHandler;
import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
import org.apache.nifi.minifi.bootstrap.service.ReloadService;
+import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
+import org.apache.nifi.minifi.bootstrap.util.UnixProcessUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,20 +98,22 @@ public class RunMiNiFi implements ConfigurationFileHolder {
Optional.ofNullable(properties.getProperty(STATUS_FILE_PID_KEY)).map(Integer::parseInt).orElse(UNINITIALIZED),
properties.getProperty(STATUS_FILE_SECRET_KEY)
);
+ ProcessUtils processUtils = new UnixProcessUtils();
MiNiFiCommandSender miNiFiCommandSender = new MiNiFiCommandSender(miNiFiParameters, getObjectMapper());
- MiNiFiStatusProvider miNiFiStatusProvider = new MiNiFiStatusProvider(miNiFiCommandSender);
+ MiNiFiStatusProvider miNiFiStatusProvider = new MiNiFiStatusProvider(miNiFiCommandSender, processUtils);
periodicStatusReporterManager =
new PeriodicStatusReporterManager(bootstrapFileProvider.getBootstrapProperties(), miNiFiStatusProvider, miNiFiCommandSender, miNiFiParameters);
configurationChangeCoordinator = new ConfigurationChangeCoordinator(bootstrapFileProvider.getBootstrapProperties(), this,
singleton(new MiNiFiConfigurationChangeListener(this, DEFAULT_LOGGER, bootstrapFileProvider)));
- CurrentPortProvider currentPortProvider = new CurrentPortProvider(miNiFiCommandSender, miNiFiParameters);
+
+ CurrentPortProvider currentPortProvider = new CurrentPortProvider(miNiFiCommandSender, miNiFiParameters, processUtils);
GracefulShutdownParameterProvider gracefulShutdownParameterProvider = new GracefulShutdownParameterProvider(bootstrapFileProvider);
- reloadService = new ReloadService(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider, this);
+ reloadService = new ReloadService(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider, this, processUtils);
commandRunnerFactory = new CommandRunnerFactory(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager,
bootstrapFileProvider, new MiNiFiStdLogHandler(), bootstrapConfigFile, this, gracefulShutdownParameterProvider,
- new MiNiFiExecCommandProvider(bootstrapFileProvider));
+ new MiNiFiExecCommandProvider(bootstrapFileProvider), processUtils);
}
public int run(BootstrapCommand command, String... args) {
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactory.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactory.java
index 6b11d5836a..33902e36f8 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactory.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactory.java
@@ -31,6 +31,7 @@ import org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiStatusProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiStdLogHandler;
import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
+import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
public class CommandRunnerFactory {
@@ -45,11 +46,12 @@ public class CommandRunnerFactory {
private final RunMiNiFi runMiNiFi;
private final GracefulShutdownParameterProvider gracefulShutdownParameterProvider;
private final MiNiFiExecCommandProvider miNiFiExecCommandProvider;
+ private final ProcessUtils processUtils;
public CommandRunnerFactory(MiNiFiCommandSender miNiFiCommandSender, CurrentPortProvider currentPortProvider, MiNiFiParameters miNiFiParameters,
MiNiFiStatusProvider miNiFiStatusProvider, PeriodicStatusReporterManager periodicStatusReporterManager,
BootstrapFileProvider bootstrapFileProvider, MiNiFiStdLogHandler miNiFiStdLogHandler, File bootstrapConfigFile, RunMiNiFi runMiNiFi,
- GracefulShutdownParameterProvider gracefulShutdownParameterProvider, MiNiFiExecCommandProvider miNiFiExecCommandProvider) {
+ GracefulShutdownParameterProvider gracefulShutdownParameterProvider, MiNiFiExecCommandProvider miNiFiExecCommandProvider, ProcessUtils processUtils) {
this.miNiFiCommandSender = miNiFiCommandSender;
this.currentPortProvider = currentPortProvider;
this.miNiFiParameters = miNiFiParameters;
@@ -61,6 +63,7 @@ public class CommandRunnerFactory {
this.runMiNiFi = runMiNiFi;
this.gracefulShutdownParameterProvider = gracefulShutdownParameterProvider;
this.miNiFiExecCommandProvider = miNiFiExecCommandProvider;
+ this.processUtils = processUtils;
}
/**
@@ -74,10 +77,10 @@ public class CommandRunnerFactory {
case START:
case RUN:
commandRunner = new StartRunner(currentPortProvider, bootstrapFileProvider, periodicStatusReporterManager, miNiFiStdLogHandler, miNiFiParameters,
- bootstrapConfigFile, runMiNiFi, miNiFiExecCommandProvider);
+ bootstrapConfigFile, runMiNiFi, miNiFiExecCommandProvider, processUtils);
break;
case STOP:
- commandRunner = new StopRunner(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider);
+ commandRunner = new StopRunner(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider, processUtils);
break;
case STATUS:
commandRunner = new StatusRunner(miNiFiParameters, miNiFiStatusProvider);
@@ -102,9 +105,9 @@ public class CommandRunnerFactory {
private List<CommandRunner> getRestartServices() {
List<CommandRunner> compositeList = new LinkedList<>();
- compositeList.add(new StopRunner(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider));
+ compositeList.add(new StopRunner(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider, processUtils));
compositeList.add(new StartRunner(currentPortProvider, bootstrapFileProvider, periodicStatusReporterManager, miNiFiStdLogHandler, miNiFiParameters,
- bootstrapConfigFile, runMiNiFi, miNiFiExecCommandProvider));
+ bootstrapConfigFile, runMiNiFi, miNiFiExecCommandProvider, processUtils));
return compositeList;
}
}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/DumpRunner.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/DumpRunner.java
index 242480b019..d9f3b2b644 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/DumpRunner.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/DumpRunner.java
@@ -32,7 +32,7 @@ import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiCommandSender;
public class DumpRunner implements CommandRunner {
- private static final String DUMP_CMD = "DUMP";
+ protected static final String DUMP_CMD = "DUMP";
private final MiNiFiCommandSender miNiFiCommandSender;
private final CurrentPortProvider currentPortProvider;
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/EnvRunner.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/EnvRunner.java
index 2a0cfc004b..b39eb7046f 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/EnvRunner.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/EnvRunner.java
@@ -28,7 +28,7 @@ import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiCommandSender;
public class EnvRunner implements CommandRunner {
- private static final String ENV_CMD = "ENV";
+ protected static final String ENV_CMD = "ENV";
private final MiNiFiCommandSender miNiFiCommandSender;
private final CurrentPortProvider currentPortProvider;
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java
index 513760f5a8..edea4ca8a0 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java
@@ -42,7 +42,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
import org.apache.nifi.bootstrap.util.OSUtils;
import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
@@ -55,7 +54,7 @@ import org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiListener;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiStdLogHandler;
import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
-import org.apache.nifi.minifi.bootstrap.util.UnixProcessUtils;
+import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
import org.apache.nifi.util.Tuple;
public class StartRunner implements CommandRunner {
@@ -72,10 +71,11 @@ public class StartRunner implements CommandRunner {
private final RunMiNiFi runMiNiFi;
private volatile ShutdownHook shutdownHook;
private final MiNiFiExecCommandProvider miNiFiExecCommandProvider;
+ private final ProcessUtils processUtils;
public StartRunner(CurrentPortProvider currentPortProvider, BootstrapFileProvider bootstrapFileProvider,
PeriodicStatusReporterManager periodicStatusReporterManager, MiNiFiStdLogHandler miNiFiStdLogHandler, MiNiFiParameters miNiFiParameters, File bootstrapConfigFile,
- RunMiNiFi runMiNiFi, MiNiFiExecCommandProvider miNiFiExecCommandProvider) {
+ RunMiNiFi runMiNiFi, MiNiFiExecCommandProvider miNiFiExecCommandProvider, ProcessUtils processUtils) {
this.currentPortProvider = currentPortProvider;
this.bootstrapFileProvider = bootstrapFileProvider;
this.periodicStatusReporterManager = periodicStatusReporterManager;
@@ -84,6 +84,7 @@ public class StartRunner implements CommandRunner {
this.bootstrapConfigFile = bootstrapConfigFile;
this.runMiNiFi = runMiNiFi;
this.miNiFiExecCommandProvider = miNiFiExecCommandProvider;
+ this.processUtils = processUtils;
}
/**
@@ -124,7 +125,7 @@ public class StartRunner implements CommandRunner {
try {
while (true) {
- if (UnixProcessUtils.isAlive(process)) {
+ if (process.isAlive()) {
handleReload();
} else {
Runtime runtime = Runtime.getRuntime();
@@ -142,7 +143,7 @@ public class StartRunner implements CommandRunner {
continue;
}
- process = restartNifi(bootstrapProperties, confDir, builder, runtime);
+ process = restartNifi(bootstrapProperties, confDir, builder);
// failed to start process
if (process == null) {
return;
@@ -159,7 +160,7 @@ public class StartRunner implements CommandRunner {
}
}
- private Process restartNifi(Properties bootstrapProperties, String confDir, ProcessBuilder builder, Runtime runtime) throws IOException {
+ private Process restartNifi(Properties bootstrapProperties, String confDir, ProcessBuilder builder) throws IOException {
Process process;
boolean previouslyStarted = runMiNiFi.isNiFiStarted();
if (!previouslyStarted) {
@@ -238,6 +239,7 @@ public class StartRunner implements CommandRunner {
runMiNiFi.setReloading(false);
}
} catch (InterruptedException ie) {
+ DEFAULT_LOGGER.warn("Thread interrupted while handling reload");
}
}
@@ -270,7 +272,7 @@ public class StartRunner implements CommandRunner {
CMD_LOGGER.info("Starting Apache MiNiFi...");
CMD_LOGGER.info("Working Directory: {}", workingDir.getAbsolutePath());
- CMD_LOGGER.info("Command: {}", cmd.stream().collect(Collectors.joining(" ")));
+ CMD_LOGGER.info("Command: {}", String.join(" ", cmd));
return new Tuple<>(builder, startMiNiFiProcess(builder));
}
@@ -296,10 +298,9 @@ public class StartRunner implements CommandRunner {
File bootstrapConfigAbsoluteFile = bootstrapConfigFile.getAbsoluteFile();
File binDir = bootstrapConfigAbsoluteFile.getParentFile();
- File workingDir = Optional.ofNullable(props.getProperty("working.dir"))
+ return Optional.ofNullable(props.getProperty("working.dir"))
.map(File::new)
.orElse(binDir.getParentFile());
- return workingDir;
}
private boolean waitForStart() {
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StopRunner.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StopRunner.java
index c712765a5e..db980d5c52 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StopRunner.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StopRunner.java
@@ -32,24 +32,26 @@ import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider;
import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
import org.apache.nifi.minifi.bootstrap.service.GracefulShutdownParameterProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiCommandSender;
-import org.apache.nifi.minifi.bootstrap.util.UnixProcessUtils;
+import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
public class StopRunner implements CommandRunner {
- private static final String SHUTDOWN_CMD = "SHUTDOWN";
+ protected static final String SHUTDOWN_CMD = "SHUTDOWN";
private final BootstrapFileProvider bootstrapFileProvider;
private final MiNiFiParameters miNiFiParameters;
private final MiNiFiCommandSender miNiFiCommandSender;
private final CurrentPortProvider currentPortProvider;
private final GracefulShutdownParameterProvider gracefulShutdownParameterProvider;
+ private final ProcessUtils processUtils;
public StopRunner(BootstrapFileProvider bootstrapFileProvider, MiNiFiParameters miNiFiParameters, MiNiFiCommandSender miNiFiCommandSender,
- CurrentPortProvider currentPortProvider, GracefulShutdownParameterProvider gracefulShutdownParameterProvider) {
+ CurrentPortProvider currentPortProvider, GracefulShutdownParameterProvider gracefulShutdownParameterProvider, ProcessUtils processUtils) {
this.bootstrapFileProvider = bootstrapFileProvider;
this.miNiFiParameters = miNiFiParameters;
this.miNiFiCommandSender = miNiFiCommandSender;
this.currentPortProvider = currentPortProvider;
this.gracefulShutdownParameterProvider = gracefulShutdownParameterProvider;
+ this.processUtils = processUtils;
}
/**
@@ -81,40 +83,18 @@ public class StopRunner implements CommandRunner {
lockFile.createNewFile();
}
- File statusFile = bootstrapFileProvider.getStatusFile();
- File pidFile = bootstrapFileProvider.getPidFile();
long minifiPid = miNiFiParameters.getMinifiPid();
try {
Optional<String> commandResponse = miNiFiCommandSender.sendCommand(SHUTDOWN_CMD, currentPort);
if (commandResponse.filter(SHUTDOWN_CMD::equals).isPresent()) {
- CMD_LOGGER.info("Apache MiNiFi has accepted the Shutdown Command and is shutting down now");
-
- if (minifiPid != UNINITIALIZED) {
- UnixProcessUtils.gracefulShutDownMiNiFiProcess(minifiPid, "MiNiFi has not finished shutting down after {} seconds. Killing process.",
- gracefulShutdownParameterProvider.getGracefulShutdownSeconds());
-
- if (statusFile.exists() && !statusFile.delete()) {
- CMD_LOGGER.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile);
- }
-
- if (pidFile.exists() && !pidFile.delete()) {
- CMD_LOGGER.error("Failed to delete pid file {}; this file should be cleaned up manually", pidFile);
- }
-
- CMD_LOGGER.info("MiNiFi has finished shutting down.");
- }
+ gracefulShutDownMiNiFiProcess(minifiPid);
} else {
CMD_LOGGER.error("When sending SHUTDOWN command to MiNiFi, got unexpected response {}", commandResponse.orElse(null));
status = ERROR.getStatusCode();
}
} catch (IOException e) {
- if (minifiPid == UNINITIALIZED) {
- DEFAULT_LOGGER.error("No PID found for the MiNiFi process, so unable to kill process; The process should be killed manually.");
- } else {
- DEFAULT_LOGGER.error("Will kill the MiNiFi Process with PID {}", minifiPid);
- UnixProcessUtils.killProcessTree(minifiPid);
- }
+ killProcessTree(minifiPid);
} finally {
if (lockFile.exists() && !lockFile.delete()) {
CMD_LOGGER.error("Failed to delete lock file {}; this file should be cleaned up manually", lockFile);
@@ -123,4 +103,34 @@ public class StopRunner implements CommandRunner {
return status;
}
+
+ private void gracefulShutDownMiNiFiProcess(long minifiPid) throws IOException {
+ CMD_LOGGER.info("Apache MiNiFi has accepted the Shutdown Command and is shutting down now");
+ File statusFile = bootstrapFileProvider.getStatusFile();
+ File pidFile = bootstrapFileProvider.getPidFile();
+
+ if (minifiPid != UNINITIALIZED) {
+ processUtils.shutdownProcess(minifiPid, "MiNiFi has not finished shutting down after {} seconds. Killing process.",
+ gracefulShutdownParameterProvider.getGracefulShutdownSeconds());
+
+ if (statusFile.exists() && !statusFile.delete()) {
+ CMD_LOGGER.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile);
+ }
+
+ if (pidFile.exists() && !pidFile.delete()) {
+ CMD_LOGGER.error("Failed to delete pid file {}; this file should be cleaned up manually", pidFile);
+ }
+
+ CMD_LOGGER.info("MiNiFi has finished shutting down.");
+ }
+ }
+
+ private void killProcessTree(long minifiPid) throws IOException {
+ if (minifiPid == UNINITIALIZED) {
+ DEFAULT_LOGGER.error("No PID found for the MiNiFi process, so unable to kill process; The process should be killed manually.");
+ } else {
+ DEFAULT_LOGGER.error("Will kill the MiNiFi Process with PID {}", minifiPid);
+ processUtils.killProcessTree(minifiPid);
+ }
+ }
}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
index deebe90ce2..1e217a896d 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
@@ -17,17 +17,16 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
-import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
-import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
-import org.slf4j.Logger;
-
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
+import org.slf4j.Logger;
public abstract class AbstractPullChangeIngestor implements Runnable, ChangeIngestor {
@@ -38,12 +37,14 @@ public abstract class AbstractPullChangeIngestor implements Runnable, ChangeInge
protected final AtomicInteger pollingPeriodMS = new AtomicInteger();
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
protected volatile ConfigurationChangeNotifier configurationChangeNotifier;
+ protected volatile ConfigurationFileHolder configurationFileHolder;
protected final AtomicReference<Properties> properties = new AtomicReference<>();
@Override
public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
this.configurationChangeNotifier = configurationChangeNotifier;
this.properties.set(properties);
+ this.configurationFileHolder = configurationFileHolder;
}
@Override
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
index 4629d5404c..d7ae75a877 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
@@ -23,7 +23,6 @@ import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.Who
import java.io.FileInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.FileSystems;
import java.nio.file.Path;
@@ -40,13 +39,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
-import org.apache.commons.io.input.TeeInputStream;
-import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.commons.io.IOUtils;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
-import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,11 +56,11 @@ import org.slf4j.LoggerFactory;
*/
public class FileChangeIngestor implements Runnable, ChangeIngestor {
- private static final Map<String, Supplier<Differentiator<InputStream>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
+ private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
static {
- HashMap<String, Supplier<Differentiator<InputStream>>> tempMap = new HashMap<>();
- tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getInputStreamDifferentiator);
+ HashMap<String, Supplier<Differentiator<ByteBuffer>>> tempMap = new HashMap<>();
+ tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator);
DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
}
@@ -80,9 +79,10 @@ public class FileChangeIngestor implements Runnable, ChangeIngestor {
private Path configFilePath;
private WatchService watchService;
private long pollingSeconds;
- private volatile Differentiator<InputStream> differentiator;
-
+ private volatile Differentiator<ByteBuffer> differentiator;
private volatile ConfigurationChangeNotifier configurationChangeNotifier;
+ private volatile ConfigurationFileHolder configurationFileHolder;
+ private volatile Properties properties;
private ScheduledExecutorService executorService;
protected static WatchService initializeWatcher(Path filePath) {
@@ -124,20 +124,13 @@ public class FileChangeIngestor implements Runnable, ChangeIngestor {
logger.debug("Checking for a change");
if (targetChanged()) {
logger.debug("Target changed, checking if it's different than current flow.");
- try (FileInputStream configFile = new FileInputStream(configFilePath.toFile());
- ByteArrayOutputStream pipedOutputStream = new ByteArrayOutputStream();
- TeeInputStream teeInputStream = new TeeInputStream(configFile, pipedOutputStream)) {
+ try (FileInputStream configFile = new FileInputStream(configFilePath.toFile())) {
+ ByteBuffer readOnlyNewConfig =
+ ConfigTransformer.overrideNonFlowSectionsFromOriginalSchema(
+ IOUtils.toByteArray(configFile), configurationFileHolder.getConfigFileReference().get().duplicate(), properties);
- if (differentiator.isNew(teeInputStream)) {
+ if (differentiator.isNew(readOnlyNewConfig)) {
logger.debug("New change, notifying listener");
- // Fill the byteArrayOutputStream with the rest of the request data
- while (teeInputStream.available() != 0) {
- teeInputStream.read();
- }
-
- ByteBuffer newConfig = ByteBuffer.wrap(pipedOutputStream.toByteArray());
- ByteBuffer readOnlyNewConfig = newConfig.asReadOnlyBuffer();
-
configurationChangeNotifier.notifyListeners(readOnlyNewConfig);
logger.debug("Listeners notified");
}
@@ -149,6 +142,8 @@ public class FileChangeIngestor implements Runnable, ChangeIngestor {
@Override
public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
+ this.properties = properties;
+ this.configurationFileHolder = configurationFileHolder;
final String rawPath = properties.getProperty(CONFIG_FILE_PATH_KEY);
final String rawPollingDuration = properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL));
@@ -169,14 +164,14 @@ public class FileChangeIngestor implements Runnable, ChangeIngestor {
final String differentiatorName = properties.getProperty(DIFFERENTIATOR_KEY);
if (differentiatorName != null && !differentiatorName.isEmpty()) {
- Supplier<Differentiator<InputStream>> differentiatorSupplier = DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiatorName);
+ Supplier<Differentiator<ByteBuffer>> differentiatorSupplier = DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiatorName);
if (differentiatorSupplier == null) {
throw new IllegalArgumentException("Property, " + DIFFERENTIATOR_KEY + ", has value " + differentiatorName + " which does not " +
"correspond to any in the PullHttpChangeIngestor Map:" + DIFFERENTIATOR_CONSTRUCTOR_MAP.keySet());
}
differentiator = differentiatorSupplier.get();
} else {
- differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
+ differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
}
differentiator.initialize(properties, configurationFileHolder);
}
@@ -193,7 +188,7 @@ public class FileChangeIngestor implements Runnable, ChangeIngestor {
this.configurationChangeNotifier = configurationChangeNotifier;
}
- protected void setDifferentiator(Differentiator<InputStream> differentiator) {
+ protected void setDifferentiator(Differentiator<ByteBuffer> differentiator) {
this.differentiator = differentiator;
}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
index 61b4f15025..02e4bac6eb 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
@@ -17,33 +17,9 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
-import okhttp3.Credentials;
-import okhttp3.HttpUrl;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.Response;
-import okhttp3.ResponseBody;
-import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
-import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
-import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
-import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
-import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
-import org.apache.nifi.minifi.commons.schema.ConfigSchema;
-import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema;
-import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
-import org.apache.nifi.minifi.commons.schema.common.StringUtil;
-import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
+import static org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator.NOTIFIER_INGESTORS_KEY;
+import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator.WHOLE_CONFIG_KEY;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocketFactory;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-import javax.net.ssl.X509TrustManager;
-import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -58,9 +34,25 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
-
-import static org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator.NOTIFIER_INGESTORS_KEY;
-import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator.WHOLE_CONFIG_KEY;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+import okhttp3.Credentials;
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
+import org.apache.nifi.minifi.commons.schema.common.StringUtil;
+import org.slf4j.LoggerFactory;
public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
@@ -109,7 +101,6 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
private volatile String connectionScheme;
private volatile String lastEtag = "";
private volatile boolean useEtag = false;
- private volatile boolean overrideSecurity = false;
public PullHttpChangeIngestor() {
logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
@@ -153,14 +144,6 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
"the default value of \"false\". It is set to \"" + useEtagString + "\".");
}
- final String overrideSecurityProperties = (String) properties.getOrDefault(OVERRIDE_SECURITY, "false");
- if ("true".equalsIgnoreCase(overrideSecurityProperties) || "false".equalsIgnoreCase(overrideSecurityProperties)) {
- overrideSecurity = Boolean.parseBoolean(overrideSecurityProperties);
- } else {
- throw new IllegalArgumentException("Property, " + OVERRIDE_SECURITY + ", to specify whether to override security properties must either be a value boolean value (\"true\" or \"false\")" +
- " or left to the default value of \"false\". It is set to \"" + overrideSecurityProperties + "\".");
- }
-
httpClientReference.set(null);
final OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder();
@@ -234,8 +217,8 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
.build();
final Request.Builder requestBuilder = new Request.Builder()
- .get()
- .url(url);
+ .get()
+ .url(url);
if (useEtag) {
requestBuilder.addHeader("If-None-Match", lastEtag);
@@ -264,29 +247,9 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
return;
}
- final ByteBuffer bodyByteBuffer = ByteBuffer.wrap(body.bytes());
- ByteBuffer readOnlyNewConfig = null;
-
// checking if some parts of the configuration must be preserved
- if (overrideSecurity) {
- readOnlyNewConfig = bodyByteBuffer.asReadOnlyBuffer();
- } else {
- logger.debug("Preserving previous security properties...");
-
- // get the current security properties from the current configuration file
- final File configFile = new File(properties.get().getProperty(RunMiNiFi.MINIFI_CONFIG_FILE_KEY));
- ConvertableSchema<ConfigSchema> configSchema = SchemaLoader.loadConvertableSchemaFromYaml(new FileInputStream(configFile));
- ConfigSchema currentSchema = configSchema.convert();
- SecurityPropertiesSchema secProps = currentSchema.getSecurityProperties();
-
- // override the security properties in the pulled configuration with the previous properties
- configSchema = SchemaLoader.loadConvertableSchemaFromYaml(new ByteBufferInputStream(bodyByteBuffer.duplicate()));
- ConfigSchema newSchema = configSchema.convert();
- newSchema.setSecurityProperties(secProps);
-
- // return the updated configuration preserving the previous security configuration
- readOnlyNewConfig = ByteBuffer.wrap(new Yaml().dump(newSchema.toMap()).getBytes()).asReadOnlyBuffer();
- }
+ ByteBuffer readOnlyNewConfig =
+ ConfigTransformer.overrideNonFlowSectionsFromOriginalSchema(body.bytes(), configurationFileHolder.getConfigFileReference().get().duplicate(), properties.get());
if (differentiator.isNew(readOnlyNewConfig)) {
logger.debug("New change received, notifying listener");
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java
index 967ea08218..cb9ded9762 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java
@@ -17,14 +17,29 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
-import org.apache.commons.io.input.TeeInputStream;
-import org.apache.commons.io.output.ByteArrayOutputStream;
+import static org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator.NOTIFIER_INGESTORS_KEY;
+import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator.WHOLE_CONFIG_KEY;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.io.IOUtils;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
-import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
@@ -35,32 +50,14 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.function.Supplier;
-
-import static org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator.NOTIFIER_INGESTORS_KEY;
-import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator.WHOLE_CONFIG_KEY;
-
public class RestChangeIngestor implements ChangeIngestor {
- private static final Map<String, Supplier<Differentiator<InputStream>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
+ private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
static {
- HashMap<String, Supplier<Differentiator<InputStream>>> tempMap = new HashMap<>();
- tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getInputStreamDifferentiator);
+ HashMap<String, Supplier<Differentiator<ByteBuffer>>> tempMap = new HashMap<>();
+ tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator);
DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
}
@@ -86,8 +83,10 @@ public class RestChangeIngestor implements ChangeIngestor {
public static final String DIFFERENTIATOR_KEY = RECEIVE_HTTP_BASE_KEY + ".differentiator";
private final Server jetty;
- private volatile Differentiator<InputStream> differentiator;
+ private volatile Differentiator<ByteBuffer> differentiator;
private volatile ConfigurationChangeNotifier configurationChangeNotifier;
+ private volatile ConfigurationFileHolder configurationFileHolder;
+ private volatile Properties properties;
public RestChangeIngestor() {
QueuedThreadPool queuedThreadPool = new QueuedThreadPool();
@@ -97,19 +96,20 @@ public class RestChangeIngestor implements ChangeIngestor {
@Override
public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
+ this.configurationFileHolder = configurationFileHolder;
+ this.properties = properties;
logger.info("Initializing");
-
final String differentiatorName = properties.getProperty(DIFFERENTIATOR_KEY);
if (differentiatorName != null && !differentiatorName.isEmpty()) {
- Supplier<Differentiator<InputStream>> differentiatorSupplier = DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiatorName);
+ Supplier<Differentiator<ByteBuffer>> differentiatorSupplier = DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiatorName);
if (differentiatorSupplier == null) {
throw new IllegalArgumentException("Property, " + DIFFERENTIATOR_KEY + ", has value " + differentiatorName + " which does not " +
"correspond to any in the PullHttpChangeIngestor Map:" + DIFFERENTIATOR_CONSTRUCTOR_MAP.keySet());
}
differentiator = differentiatorSupplier.get();
} else {
- differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
+ differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
}
differentiator.initialize(properties, configurationFileHolder);
@@ -207,7 +207,7 @@ public class RestChangeIngestor implements ChangeIngestor {
logger.info("Added an https connector on the host '{}' and port '{}'", new Object[]{https.getHost(), https.getPort()});
}
- protected void setDifferentiator(Differentiator<InputStream> differentiator) {
+ protected void setDifferentiator(Differentiator<ByteBuffer> differentiator) {
this.differentiator = differentiator;
}
@@ -215,7 +215,7 @@ public class RestChangeIngestor implements ChangeIngestor {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
- throws IOException, ServletException {
+ throws IOException {
logRequest(request);
@@ -224,17 +224,12 @@ public class RestChangeIngestor implements ChangeIngestor {
if (POST.equals(request.getMethod())) {
int statusCode;
String responseText;
- try (ByteArrayOutputStream pipedOutputStream = new ByteArrayOutputStream();
- TeeInputStream teeInputStream = new TeeInputStream(request.getInputStream(), pipedOutputStream)) {
+ try {
+ ByteBuffer readOnlyNewConfig =
+ ConfigTransformer.overrideNonFlowSectionsFromOriginalSchema(
+ IOUtils.toByteArray(request.getInputStream()), configurationFileHolder.getConfigFileReference().get().duplicate(), properties);
- if (differentiator.isNew(teeInputStream)) {
- // Fill the pipedOutputStream with the rest of the request data
- while (teeInputStream.available() != 0) {
- teeInputStream.read();
- }
-
- ByteBuffer newConfig = ByteBuffer.wrap(pipedOutputStream.toByteArray());
- ByteBuffer readOnlyNewConfig = newConfig.asReadOnlyBuffer();
+ if (differentiator.isNew(readOnlyNewConfig)) {
Collection<ListenerHandleResult> listenerHandleResults = configurationChangeNotifier.notifyListeners(readOnlyNewConfig);
@@ -250,9 +245,13 @@ public class RestChangeIngestor implements ChangeIngestor {
statusCode = 409;
responseText = "Request received but instance is already running this config.";
}
-
- writeOutput(response, responseText, statusCode);
+ } catch (Exception e) {
+ logger.error("Failed to override config file", e);
+ statusCode = 500;
+ responseText = "Failed to override config file";
}
+
+ writeOutput(response, responseText, statusCode);
} else if (GET.equals(request.getMethod())) {
writeOutput(response, GET_TEXT, 200);
} else {
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodec.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodec.java
index 124ff059f6..c64bc547cf 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodec.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodec.java
@@ -25,6 +25,7 @@ import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.Arrays;
+import java.util.Optional;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.exception.InvalidCommandException;
import org.slf4j.Logger;
@@ -48,7 +49,7 @@ public class BootstrapCodec {
public void communicate() throws IOException {
String line = reader.readLine();
- String[] splits = line.split(" ");
+ String[] splits = Optional.ofNullable(line).map(l -> l.split(" ")).orElse(new String[0]);
if (splits.length == 0) {
throw new IOException("Received invalid command from MiNiFi: " + line);
}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/CurrentPortProvider.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/CurrentPortProvider.java
index e4d32dad53..f9f2f989c6 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/CurrentPortProvider.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/CurrentPortProvider.java
@@ -21,15 +21,17 @@ import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.DEFAULT_LOGGER;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.UNINITIALIZED;
import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
-import org.apache.nifi.minifi.bootstrap.util.UnixProcessUtils;
+import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
public class CurrentPortProvider {
private final MiNiFiCommandSender miNiFiCommandSender;
private final MiNiFiParameters miNiFiParameters;
+ private final ProcessUtils processUtils;
- public CurrentPortProvider(MiNiFiCommandSender miNiFiCommandSender, MiNiFiParameters miNiFiParameters) {
+ public CurrentPortProvider(MiNiFiCommandSender miNiFiCommandSender, MiNiFiParameters miNiFiParameters, ProcessUtils processUtils) {
this.miNiFiCommandSender = miNiFiCommandSender;
this.miNiFiParameters = miNiFiParameters;
+ this.processUtils = processUtils;
}
public Integer getCurrentPort() {
@@ -50,7 +52,7 @@ public class CurrentPortProvider {
long minifiPid = miNiFiParameters.getMinifiPid();
DEFAULT_LOGGER.debug("Current PID {}", minifiPid);
- boolean procRunning = UnixProcessUtils.isProcessRunning(minifiPid);
+ boolean procRunning = processUtils.isProcessRunning(minifiPid);
if (procRunning) {
return miNiFiPort;
} else {
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/GracefulShutdownParameterProvider.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/GracefulShutdownParameterProvider.java
index 4de7e673ec..b75c0157e1 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/GracefulShutdownParameterProvider.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/GracefulShutdownParameterProvider.java
@@ -24,10 +24,10 @@ import org.slf4j.LoggerFactory;
public class GracefulShutdownParameterProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(GracefulShutdownParameterProvider.class);
- private static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds";
- private static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20";
private static final String INVALID_GRACEFUL_SHUTDOWN_SECONDS_MESSAGE =
"The {} property in Bootstrap Config File has an invalid value. Must be a non-negative integer, Falling back to the default {} value";
+ protected static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds";
+ protected static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20";
private final BootstrapFileProvider bootstrapFileProvider;
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java
index e1c30d835d..c0343c9f5e 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java
@@ -19,15 +19,10 @@ package org.apache.nifi.minifi.bootstrap.service;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.MINIFI_CONFIG_FILE_KEY;
-import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.OVERRIDE_SECURITY;
-import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@@ -38,11 +33,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
-import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
-import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
-import org.apache.nifi.minifi.commons.schema.ConfigSchema;
-import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
-import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
import org.slf4j.Logger;
public class MiNiFiConfigurationChangeListener implements ConfigurationChangeListener {
@@ -67,13 +57,10 @@ public class MiNiFiConfigurationChangeListener implements ConfigurationChangeLis
throw new ConfigurationChangeException("Instance is already handling another change");
}
// Store the incoming stream as a byte array to be shared among components that need it
- try(ByteArrayOutputStream bufferedConfigOs = new ByteArrayOutputStream()) {
-
+ try {
Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
- IOUtils.copy(configInputStream, bufferedConfigOs);
-
File swapConfigFile = bootstrapFileProvider.getSwapFile();
logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath());
@@ -81,11 +68,11 @@ public class MiNiFiConfigurationChangeListener implements ConfigurationChangeLis
Files.copy(configFileInputStream, swapConfigFile.toPath(), REPLACE_EXISTING);
}
- persistBackNonFlowSectionsFromOriginalSchema(bufferedConfigOs.toByteArray(), bootstrapProperties, configFile);
+ // write out new config to file
+ Files.copy(configInputStream, configFile.toPath(), REPLACE_EXISTING);
// Create an input stream to feed to the config transformer
try (FileInputStream newConfigIs = new FileInputStream(configFile)) {
-
try {
String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
transformConfigurationFiles(confDir, newConfigIs, configFile, swapConfigFile);
@@ -146,61 +133,4 @@ public class MiNiFiConfigurationChangeListener implements ConfigurationChangeLis
throw new IOException("Unable to successfully restart MiNiFi instance after configuration change.", e);
}
}
-
- private void persistBackNonFlowSectionsFromOriginalSchema(byte[] newSchema, Properties bootstrapProperties, File configFile) {
- try {
- ConvertableSchema<ConfigSchema> schemaNew = ConfigTransformer
- .throwIfInvalid(SchemaLoader.loadConvertableSchemaFromYaml(new ByteArrayInputStream(newSchema)));
- ConfigSchema configSchemaNew = ConfigTransformer.throwIfInvalid(schemaNew.convert());
- ConvertableSchema<ConfigSchema> schemaOld = ConfigTransformer
- .throwIfInvalid(SchemaLoader.loadConvertableSchemaFromYaml(new ByteBufferInputStream(runner.getConfigFileReference().get().duplicate())));
- ConfigSchema configSchemaOld = ConfigTransformer.throwIfInvalid(schemaOld.convert());
-
- configSchemaNew.setNifiPropertiesOverrides(configSchemaOld.getNifiPropertiesOverrides());
-
- if (!overrideCoreProperties(bootstrapProperties)) {
- logger.debug("Preserving previous core properties...");
- configSchemaNew.setCoreProperties(configSchemaOld.getCoreProperties());
- }
-
- if (!overrideSecurityProperties(bootstrapProperties)) {
- logger.debug("Preserving previous security properties...");
- configSchemaNew.setSecurityProperties(configSchemaOld.getSecurityProperties());
- }
-
- logger.debug("Persisting changes to {}", configFile.getAbsolutePath());
- SchemaLoader.toYaml(configSchemaNew, new FileWriter(configFile));
- } catch (Exception e) {
- logger.error("Loading the old and the new schema for merging was not successful", e);
- }
- }
-
- private static boolean overrideSecurityProperties(Properties properties) {
- String overrideSecurityProperties = (String) properties.getOrDefault(OVERRIDE_SECURITY, "false");
- boolean overrideSecurity;
- if ("true".equalsIgnoreCase(overrideSecurityProperties) || "false".equalsIgnoreCase(overrideSecurityProperties)) {
- overrideSecurity = Boolean.parseBoolean(overrideSecurityProperties);
- } else {
- throw new IllegalArgumentException(
- "Property, " + OVERRIDE_SECURITY + ", to specify whether to override security properties must either be a value boolean value (\"true\" or \"false\")" +
- " or left to the default value of \"false\". It is set to \"" + overrideSecurityProperties + "\".");
- }
-
- return overrideSecurity;
- }
-
- private static boolean overrideCoreProperties(Properties properties) {
- String overrideCorePropertiesKey = PULL_HTTP_BASE_KEY + ".override.core";
- String overrideCoreProps = (String) properties.getOrDefault(overrideCorePropertiesKey, "false");
- boolean overrideCoreProperties;
- if ("true".equalsIgnoreCase(overrideCoreProps) || "false".equalsIgnoreCase(overrideCoreProps)) {
- overrideCoreProperties = Boolean.parseBoolean(overrideCoreProps);
- } else {
- throw new IllegalArgumentException(
- "Property, " + overrideCorePropertiesKey + ", to specify whether to override core properties must either be a value boolean value (\"true\" or \"false\")" +
- " or left to the default value of \"false\". It is set to \"" + overrideCoreProps + "\".");
- }
-
- return overrideCoreProperties;
- }
}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiStatusProvider.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiStatusProvider.java
index acd6395ea8..e840cf1a33 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiStatusProvider.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiStatusProvider.java
@@ -17,16 +17,18 @@
package org.apache.nifi.minifi.bootstrap.service;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.UNINITIALIZED;
-import static org.apache.nifi.minifi.bootstrap.util.UnixProcessUtils.isProcessRunning;
import org.apache.nifi.minifi.bootstrap.MiNiFiStatus;
+import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
public class MiNiFiStatusProvider {
private final MiNiFiCommandSender miNiFiCommandSender;
+ private final ProcessUtils processUtils;
- public MiNiFiStatusProvider(MiNiFiCommandSender miNiFiCommandSender) {
+ public MiNiFiStatusProvider(MiNiFiCommandSender miNiFiCommandSender, ProcessUtils processUtils) {
this.miNiFiCommandSender = miNiFiCommandSender;
+ this.processUtils = processUtils;
}
public MiNiFiStatus getStatus(int port, long pid) {
@@ -43,6 +45,6 @@ public class MiNiFiStatusProvider {
return new MiNiFiStatus(port, pid, true, true);
}
- return new MiNiFiStatus(port, pid, false, isProcessRunning(pid));
+ return new MiNiFiStatus(port, pid, false, processUtils.isProcessRunning(pid));
}
}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/ReloadService.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/ReloadService.java
index dc87028037..4b7272e591 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/ReloadService.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/ReloadService.java
@@ -25,7 +25,7 @@ import java.io.IOException;
import java.util.Optional;
import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
-import org.apache.nifi.minifi.bootstrap.util.UnixProcessUtils;
+import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
public class ReloadService {
private final BootstrapFileProvider bootstrapFileProvider;
@@ -35,16 +35,18 @@ public class ReloadService {
private final CurrentPortProvider currentPortProvider;
private final GracefulShutdownParameterProvider gracefulShutdownParameterProvider;
private final RunMiNiFi runMiNiFi;
+ private final ProcessUtils processUtils;
public ReloadService(BootstrapFileProvider bootstrapFileProvider, MiNiFiParameters miNiFiParameters,
MiNiFiCommandSender miNiFiCommandSender, CurrentPortProvider currentPortProvider,
- GracefulShutdownParameterProvider gracefulShutdownParameterProvider, RunMiNiFi runMiNiFi) {
+ GracefulShutdownParameterProvider gracefulShutdownParameterProvider, RunMiNiFi runMiNiFi, ProcessUtils processUtils) {
this.bootstrapFileProvider = bootstrapFileProvider;
this.miNiFiParameters = miNiFiParameters;
this.miNiFiCommandSender = miNiFiCommandSender;
this.currentPortProvider = currentPortProvider;
this.gracefulShutdownParameterProvider = gracefulShutdownParameterProvider;
this.runMiNiFi = runMiNiFi;
+ this.processUtils = processUtils;
}
public void reload() throws IOException {
@@ -60,7 +62,7 @@ public class ReloadService {
if (commandResponse.filter(RELOAD_CMD::equals).isPresent()) {
DEFAULT_LOGGER.info("Apache MiNiFi has accepted the Reload Command and is reloading");
if (minifiPid != UNINITIALIZED) {
- UnixProcessUtils.gracefulShutDownMiNiFiProcess(minifiPid, "MiNiFi has not finished shutting down after {} seconds as part of configuration reload. Killing process.",
+ processUtils.shutdownProcess(minifiPid, "MiNiFi has not finished shutting down after {} seconds as part of configuration reload. Killing process.",
gracefulShutdownParameterProvider.getGracefulShutdownSeconds());
runMiNiFi.setReloading(true);
DEFAULT_LOGGER.info("MiNiFi has finished shutting down and will be reloaded.");
@@ -73,7 +75,7 @@ public class ReloadService {
DEFAULT_LOGGER.error("No PID found for the MiNiFi process, so unable to kill process; The process should be killed manually.");
} else {
DEFAULT_LOGGER.error("Will kill the MiNiFi Process with PID {}", minifiPid);
- UnixProcessUtils.killProcessTree(minifiPid);
+ processUtils.killProcessTree(minifiPid);
}
} finally {
if (reloadLockFile.exists() && !reloadLockFile.delete()) {
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index ca8d6a54f3..a9410039e4 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -17,10 +17,15 @@
package org.apache.nifi.minifi.bootstrap.util;
+import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.OVERRIDE_SECURITY;
+import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
+
+import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -79,13 +84,13 @@ import org.w3c.dom.Document;
import org.w3c.dom.Element;
public final class ConfigTransformer {
- // Underlying version of NIFI will be using
- public static final String ROOT_GROUP = "Root-Group";
-
+ private static final String OVERRIDE_CORE_PROPERTIES_KEY = PULL_HTTP_BASE_KEY + ".override.core";
private static final Base64.Encoder KEY_ENCODER = Base64.getEncoder().withoutPadding();
private static final int SENSITIVE_PROPERTIES_KEY_LENGTH = 24;
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConfigTransformer.class);
- public static final Logger logger = LoggerFactory.getLogger(ConfigTransformer.class);
+ // Underlying version of NIFI will be using
+ public static final String ROOT_GROUP = "Root-Group";
// Final util classes should have private constructor
private ConfigTransformer() {
@@ -119,19 +124,19 @@ public final class ConfigTransformer {
// See if we are providing defined properties from the filesystem configurations and use those as the definitive values
if (securityProperties != null) {
configSchemaNew.setSecurityProperties(securityProperties);
- logger.info("Bootstrap flow override: Replaced security properties");
+ LOGGER.info("Bootstrap flow override: Replaced security properties");
}
if (provenanceReportingProperties != null) {
configSchemaNew.setProvenanceReportingProperties(provenanceReportingProperties);
- logger.info("Bootstrap flow override: Replaced provenance reporting properties");
+ LOGGER.info("Bootstrap flow override: Replaced provenance reporting properties");
}
// Replace all processor SSL controller services with MiNiFi parent, if bootstrap boolean is set to true
if (BootstrapTransformer.processorSSLOverride(bootstrapProperties)) {
for (ProcessorSchema processorConfig : configSchemaNew.getProcessGroupSchema().getProcessors()) {
processorConfig.getProperties().replace("SSL Context Service", processorConfig.getProperties().get("SSL Context Service"), "SSL-Context-Service");
- logger.info("Bootstrap flow override: Replaced {} SSL Context Service with parent MiNiFi SSL", processorConfig.getName());
+ LOGGER.info("Bootstrap flow override: Replaced {} SSL Context Service with parent MiNiFi SSL", processorConfig.getName());
}
}
@@ -286,7 +291,7 @@ public final class ConfigTransformer {
final String notnullSensitivePropertiesKey;
// Auto-generate the sensitive properties key if not provided, NiFi security libraries require it
if (StringUtil.isNullOrEmpty(sensitivePropertiesKey)) {
- logger.warn("Generating Random Sensitive Properties Key [{}]", NiFiProperties.SENSITIVE_PROPS_KEY);
+ LOGGER.warn("Generating Random Sensitive Properties Key [{}]", NiFiProperties.SENSITIVE_PROPS_KEY);
final SecureRandom secureRandom = new SecureRandom();
final byte[] sensitivePropertiesKeyBinary = new byte[SENSITIVE_PROPERTIES_KEY_LENGTH];
secureRandom.nextBytes(sensitivePropertiesKeyBinary);
@@ -750,6 +755,52 @@ public final class ConfigTransformer {
element.appendChild(toAdd);
}
+ public static ByteBuffer overrideNonFlowSectionsFromOriginalSchema(byte[] newSchema, ByteBuffer currentConfigScheme, Properties bootstrapProperties)
+ throws InvalidConfigurationException {
+ try {
+ boolean overrideCoreProperties = ConfigTransformer.overrideCoreProperties(bootstrapProperties);
+ boolean overrideSecurityProperties = ConfigTransformer.overrideSecurityProperties(bootstrapProperties);
+ if (overrideCoreProperties && overrideSecurityProperties) {
+ return ByteBuffer.wrap(newSchema);
+ } else {
+ ConvertableSchema<ConfigSchema> schemaNew = ConfigTransformer
+ .throwIfInvalid(SchemaLoader.loadConvertableSchemaFromYaml(new ByteArrayInputStream(newSchema)));
+ ConfigSchema configSchemaNew = ConfigTransformer.throwIfInvalid(schemaNew.convert());
+ ConvertableSchema<ConfigSchema> schemaOld = ConfigTransformer
+ .throwIfInvalid(SchemaLoader.loadConvertableSchemaFromYaml(new ByteBufferInputStream(currentConfigScheme)));
+ ConfigSchema configSchemaOld = ConfigTransformer.throwIfInvalid(schemaOld.convert());
+
+ configSchemaNew.setNifiPropertiesOverrides(configSchemaOld.getNifiPropertiesOverrides());
+
+ if (!overrideCoreProperties) {
+ LOGGER.debug("Preserving previous core properties...");
+ configSchemaNew.setCoreProperties(configSchemaOld.getCoreProperties());
+ }
+
+ if (!overrideSecurityProperties) {
+ LOGGER.debug("Preserving previous security properties...");
+ configSchemaNew.setSecurityProperties(configSchemaOld.getSecurityProperties());
+ }
+
+ StringWriter writer = new StringWriter();
+ SchemaLoader.toYaml(configSchemaNew, writer);
+ return ByteBuffer.wrap(writer.toString().getBytes()).asReadOnlyBuffer();
+ }
+ } catch (Exception e) {
+ throw new InvalidConfigurationException("Loading the old and the new schema for merging was not successful", e);
+ }
+ }
+
+ private static boolean overrideSecurityProperties(Properties properties) {
+ String overrideSecurityProperties = (String) properties.getOrDefault(OVERRIDE_SECURITY, "false");
+ return Boolean.parseBoolean(overrideSecurityProperties);
+ }
+
+ private static boolean overrideCoreProperties(Properties properties) {
+ String overrideCoreProps = (String) properties.getOrDefault(OVERRIDE_CORE_PROPERTIES_KEY, "false");
+ return Boolean.parseBoolean(overrideCoreProps);
+ }
+
public static final String PROPERTIES_FILE_APACHE_2_0_LICENSE =
" Licensed to the Apache Software Foundation (ASF) under one or more\n" +
"# contributor license agreements. See the NOTICE file distributed with\n" +
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ProcessUtils.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ProcessUtils.java
new file mode 100644
index 0000000000..d94b3292b1
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ProcessUtils.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.util;
+
+import java.io.IOException;
+
+public interface ProcessUtils {
+ boolean isProcessRunning(Long pid);
+
+ void shutdownProcess(Long pid, String s, int gracefulShutdownSeconds);
+
+ void killProcessTree(Long pid) throws IOException;
+}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/UnixProcessUtils.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/UnixProcessUtils.java
index a6d13ca416..1a13138696 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/UnixProcessUtils.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/UnixProcessUtils.java
@@ -17,6 +17,8 @@
package org.apache.nifi.minifi.bootstrap.util;
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.DEFAULT_LOGGER;
+
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@@ -32,10 +34,11 @@ import org.slf4j.LoggerFactory;
* Utility class for providing information about the running MiNiFi process.
* The methods which are using the PID are working only on unix systems, and should be used only as a fallback in case the PING command fails.
* */
-public class UnixProcessUtils {
+public class UnixProcessUtils implements ProcessUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(UnixProcessUtils.class);
- public static boolean isProcessRunning(Long pid) {
+ @Override
+ public boolean isProcessRunning(Long pid) {
if (pid == null) {
LOGGER.error("Unable to get process status due to missing process id");
return false;
@@ -72,17 +75,18 @@ public class UnixProcessUtils {
}
}
- public static void gracefulShutDownMiNiFiProcess(Long pid, String s, int gracefulShutdownSeconds) {
+ @Override
+ public void shutdownProcess(Long pid, String s, int gracefulShutdownSeconds) {
long startWait = System.nanoTime();
- while (UnixProcessUtils.isProcessRunning(pid)) {
+ while (isProcessRunning(pid)) {
LOGGER.info("Waiting for Apache MiNiFi to finish shutting down...");
long waitNanos = System.nanoTime() - startWait;
long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
if (waitSeconds >= gracefulShutdownSeconds || gracefulShutdownSeconds == 0) {
- if (UnixProcessUtils.isProcessRunning(pid)) {
+ if (isProcessRunning(pid)) {
LOGGER.warn(s, gracefulShutdownSeconds);
try {
- UnixProcessUtils.killProcessTree(pid);
+ killProcessTree(pid);
} catch (IOException ioe) {
LOGGER.error("Failed to kill Process with PID {}", pid);
}
@@ -92,12 +96,14 @@ public class UnixProcessUtils {
try {
Thread.sleep(2000L);
} catch (InterruptedException ie) {
+ DEFAULT_LOGGER.warn("Thread interrupted while shutting down MiNiFi");
}
}
}
}
- public static void killProcessTree(Long pid) throws IOException {
+ @Override
+ public void killProcessTree(Long pid) throws IOException {
LOGGER.debug("Killing Process Tree for PID {}", pid);
List<Long> children = getChildProcesses(pid);
@@ -110,22 +116,7 @@ public class UnixProcessUtils {
Runtime.getRuntime().exec(new String[]{"kill", "-9", String.valueOf(pid)});
}
- /**
- * Checks the status of the given process.
- *
- * @param process the process object what we want to check
- * @return true if the process is Alive
- */
- public static boolean isAlive(Process process) {
- try {
- process.exitValue();
- return false;
- } catch (IllegalStateException | IllegalThreadStateException itse) {
- return true;
- }
- }
-
- private static List<Long> getChildProcesses(Long ppid) throws IOException {
+ private List<Long> getChildProcesses(Long ppid) throws IOException {
Process proc = Runtime.getRuntime().exec(new String[]{"ps", "-o", "pid", "--no-headers", "--ppid", String.valueOf(ppid)});
List<Long> childPids = new ArrayList<>();
try (InputStream in = proc.getInputStream();
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/ShutdownHookTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/ShutdownHookTest.java
new file mode 100644
index 0000000000..729b5426d6
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/ShutdownHookTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap;
+
+import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.STOP;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.nifi.minifi.bootstrap.service.MiNiFiStdLogHandler;
+import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class ShutdownHookTest {
+
+ @Mock
+ private RunMiNiFi runner;
+ @Mock
+ private MiNiFiStdLogHandler miNiFiStdLogHandler;
+ @Mock
+ private PeriodicStatusReporterManager periodicStatusReporterManager;
+
+ @InjectMocks
+ private ShutdownHook shutdownHook;
+
+ @Test
+ void testRunShouldShutdownSchedulersAndProcesses() {
+ when(runner.getPeriodicStatusReporterManager()).thenReturn(periodicStatusReporterManager);
+
+ shutdownHook.run();
+
+ verify(miNiFiStdLogHandler).shutdown();
+ verify(runner).shutdownChangeNotifier();
+ verify(runner).getPeriodicStatusReporterManager();
+ verify(periodicStatusReporterManager).shutdownPeriodicStatusReporters();
+ verify(runner).setAutoRestartNiFi(false);
+ verify(runner).run(STOP);
+ }
+}
\ No newline at end of file
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactoryTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactoryTest.java
new file mode 100644
index 0000000000..366d70b9b7
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactoryTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.command;
+
+import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.DUMP;
+import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.ENV;
+import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.FLOWSTATUS;
+import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.RESTART;
+import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.RUN;
+import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.START;
+import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.STATUS;
+import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.STOP;
+import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.UNKNOWN;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.verifyNoInteractions;
+
+import java.io.File;
+import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider;
+import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
+import org.apache.nifi.minifi.bootstrap.service.GracefulShutdownParameterProvider;
+import org.apache.nifi.minifi.bootstrap.service.MiNiFiCommandSender;
+import org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider;
+import org.apache.nifi.minifi.bootstrap.service.MiNiFiStatusProvider;
+import org.apache.nifi.minifi.bootstrap.service.MiNiFiStdLogHandler;
+import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class CommandRunnerFactoryTest {
+
+ @Mock
+ private MiNiFiCommandSender miNiFiCommandSender;
+ @Mock
+ private CurrentPortProvider currentPortProvider;
+ @Mock
+ private MiNiFiParameters miNiFiParameters;
+ @Mock
+ private MiNiFiStatusProvider miNiFiStatusProvider;
+ @Mock
+ private PeriodicStatusReporterManager periodicStatusReporterManager;
+ @Mock
+ private BootstrapFileProvider bootstrapFileProvider;
+ @Mock
+ private MiNiFiStdLogHandler miNiFiStdLogHandler;
+ @Mock
+ private File bootstrapConfigFile;
+ @Mock
+ private RunMiNiFi runMiNiFi;
+ @Mock
+ private GracefulShutdownParameterProvider gracefulShutdownParameterProvider;
+ @Mock
+ private MiNiFiExecCommandProvider miNiFiExecCommandProvider;
+
+ @InjectMocks
+ private CommandRunnerFactory commandRunnerFactory;
+
+ @Test
+ void testRunCommandShouldStartCommandReturnStartRunner() {
+ CommandRunner runner = commandRunnerFactory.getRunner(START);
+
+ assertInstanceOf(StartRunner.class, runner);
+ verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
+ miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+ }
+
+ @Test
+ void testRunCommandShouldRunCommandReturnStartRunner() {
+ CommandRunner runner = commandRunnerFactory.getRunner(RUN);
+
+ assertInstanceOf(StartRunner.class, runner);
+ verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
+ miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+ }
+
+ @Test
+ void testRunCommandShouldStopCommandReturnStopRunner() {
+ CommandRunner runner = commandRunnerFactory.getRunner(STOP);
+
+ assertInstanceOf(StopRunner.class, runner);
+ verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
+ miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+ }
+
+ @Test
+ void testRunCommandShouldEnvCommandReturnEnvRunner() {
+ CommandRunner runner = commandRunnerFactory.getRunner(ENV);
+
+ assertInstanceOf(EnvRunner.class, runner);
+ verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
+ miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+ }
+
+ @Test
+ void testRunCommandShouldDumpCommandReturnDumpRunner() {
+ CommandRunner runner = commandRunnerFactory.getRunner(DUMP);
+
+ assertInstanceOf(DumpRunner.class, runner);
+ verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
+ miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+ }
+
+ @Test
+ void testRunCommandShouldFlowStatusCommandReturnFlowStatusRunner() {
+ CommandRunner runner = commandRunnerFactory.getRunner(FLOWSTATUS);
+
+ assertInstanceOf(FlowStatusRunner.class, runner);
+ verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
+ miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+ }
+
+ @Test
+ void testRunCommandShouldStatusCommandReturnStatusRunner() {
+ CommandRunner runner = commandRunnerFactory.getRunner(STATUS);
+
+ assertInstanceOf(StatusRunner.class, runner);
+ verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
+ miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+ }
+
+ @Test
+ void testRunCommandShouldRestartCommandReturnCompositeRunner() {
+ CommandRunner runner = commandRunnerFactory.getRunner(RESTART);
+
+ assertInstanceOf(CompositeCommandRunner.class, runner);
+ verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
+ miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+ }
+
+ @Test
+ void testRunCommandShouldThrowIllegalArgumentExceptionInCaseOfUnknownCommand() {
+ assertThrows(IllegalArgumentException.class, () -> commandRunnerFactory.getRunner(UNKNOWN));
+ }
+}
\ No newline at end of file
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/CompositeCommandRunnerTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/CompositeCommandRunnerTest.java
new file mode 100644
index 0000000000..0c56d7fec9
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/CompositeCommandRunnerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.command;
+
+import static org.apache.nifi.minifi.bootstrap.Status.ERROR;
+import static org.apache.nifi.minifi.bootstrap.Status.OK;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class CompositeCommandRunnerTest {
+
+ @Mock
+ private CommandRunner startRunner;
+ @Mock
+ private CommandRunner stopRunner;
+ private CompositeCommandRunner compositeCommandRunner;
+
+ @BeforeEach
+ void setup() {
+ compositeCommandRunner = new CompositeCommandRunner(Arrays.asList(startRunner, stopRunner));
+ }
+
+ @Test
+ void testRunCommandShouldExecuteCommandsTillFirstNonSuccessStatusCode() {
+ when(startRunner.runCommand(any())).thenReturn(ERROR.getStatusCode());
+
+ int statusCode = compositeCommandRunner.runCommand(new String[0]);
+
+ assertEquals(ERROR.getStatusCode(), statusCode);
+ verify(startRunner).runCommand(any());
+ verifyNoInteractions(stopRunner);
+ }
+
+ @Test
+ void testRunCommandShouldExecuteCommandsAndReturnOKWhenThereWasNoError() {
+ when(startRunner.runCommand(any())).thenReturn(OK.getStatusCode());
+ when(stopRunner.runCommand(any())).thenReturn(OK.getStatusCode());
+
+ int statusCode = compositeCommandRunner.runCommand(new String[0]);
+
+ assertEquals(OK.getStatusCode(), statusCode);
+ verify(startRunner).runCommand(any());
+ verify(stopRunner).runCommand(any());
+ verifyNoMoreInteractions(startRunner, stopRunner);
+ }
+}
\ No newline at end of file
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/DumpRunnerTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/DumpRunnerTest.java
new file mode 100644
index 0000000000..ca3b026282
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/DumpRunnerTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.command;
+
+import static org.apache.nifi.minifi.bootstrap.Status.ERROR;
+import static org.apache.nifi.minifi.bootstrap.Status.MINIFI_NOT_RUNNING;
+import static org.apache.nifi.minifi.bootstrap.Status.OK;
+import static org.apache.nifi.minifi.bootstrap.command.DumpRunner.DUMP_CMD;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Optional;
+import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
+import org.apache.nifi.minifi.bootstrap.service.MiNiFiCommandSender;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DumpRunnerTest {
+
+ private static final int MINIFI_PORT = 1337;
+ private static final String DUMP_CONTENT = "dump_content";
+
+ @Mock
+ private MiNiFiCommandSender miNiFiCommandSender;
+ @Mock
+ private CurrentPortProvider currentPortProvider;
+
+ @InjectMocks
+ private DumpRunner dumpRunner;
+
+ @Test
+ void testRunCommandShouldDumpToConsoleIfNoFileDefined() throws IOException {
+ when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
+ when(miNiFiCommandSender.sendCommand(DUMP_CMD, MINIFI_PORT)).thenReturn(Optional.of(DUMP_CONTENT));
+
+ int statusCode = dumpRunner.runCommand(new String[0]);
+
+ assertEquals(OK.getStatusCode(), statusCode);
+ verifyNoMoreInteractions(currentPortProvider, miNiFiCommandSender);
+ }
+
+ @Test
+ void testRunCommandShouldDumpToFileIfItIsDefined() throws IOException {
+ when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
+ when(miNiFiCommandSender.sendCommand(DUMP_CMD, MINIFI_PORT)).thenReturn(Optional.of(DUMP_CONTENT));
+ File file = Files.createTempFile(null, null).toFile();
+ file.deleteOnExit();
+ String tmpFilePath = file.getAbsolutePath();
+
+ int statusCode = dumpRunner.runCommand(new String[] {DUMP_CMD, tmpFilePath});
+
+ assertEquals(OK.getStatusCode(), statusCode);
+ assertEquals(DUMP_CONTENT, getDumpContent(file));
+ verifyNoMoreInteractions(currentPortProvider, miNiFiCommandSender);
+ }
+
+ @Test
+ void testRunCommandShouldReturnNotRunningStatusCodeIfPortReturnsNull() {
+ when(currentPortProvider.getCurrentPort()).thenReturn(null);
+
+ int statusCode = dumpRunner.runCommand(new String[]{});
+
+ assertEquals(MINIFI_NOT_RUNNING.getStatusCode(), statusCode);
+ verifyNoMoreInteractions(currentPortProvider);
+ verifyNoInteractions(miNiFiCommandSender);
+ }
+
+ @Test
+ void testRunCommandShouldReturnErrorStatusCodeIfSendCommandThrowsException() throws IOException {
+ when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
+ when(miNiFiCommandSender.sendCommand(DUMP_CMD, MINIFI_PORT)).thenThrow(new IOException());
+
+ int statusCode = dumpRunner.runCommand(new String[]{});
+
+ assertEquals(ERROR.getStatusCode(), statusCode);
+ verifyNoMoreInteractions(currentPortProvider, miNiFiCommandSender);
+ }
+
+ @Test
+ void testRunCommandShouldReturnErrorStatusCodeIfFileWriteFailureHappens() throws IOException {
+ when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
+ when(miNiFiCommandSender.sendCommand(DUMP_CMD, MINIFI_PORT)).thenReturn(Optional.ofNullable(DUMP_CONTENT));
+ File file = Files.createTempFile(null, null).toFile();
+ file.deleteOnExit();
+ file.setReadOnly();
+ String tmpFilePath = file.getAbsolutePath();
+
+ int statusCode = dumpRunner.runCommand(new String[] {DUMP_CMD, tmpFilePath});
+
+ assertEquals(ERROR.getStatusCode(), statusCode);
+ verifyNoMoreInteractions(currentPortProvider, miNiFiCommandSender);
+ }
+
+ private String getDumpContent(File dumpFile) {
+ String fileLines = null;
+ if (dumpFile.exists()) {
+ try {
+ fileLines = new String(Files.readAllBytes(dumpFile.toPath()));
+ } catch (IOException e) {
+ fileLines = null;
+ }
+ }
+ return fileLines;
+ }
+
+}
\ No newline at end of file
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/EnvRunnerTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/EnvRunnerTest.java
new file mode 100644
index 0000000000..8d97733de5
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/EnvRunnerTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.command;
+
+import static org.apache.nifi.minifi.bootstrap.Status.ERROR;
+import static org.apache.nifi.minifi.bootstrap.Status.MINIFI_NOT_RUNNING;
+import static org.apache.nifi.minifi.bootstrap.Status.OK;
+import static org.apache.nifi.minifi.bootstrap.command.EnvRunner.ENV_CMD;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
+import org.apache.nifi.minifi.bootstrap.service.MiNiFiCommandSender;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class EnvRunnerTest {
+
+ private static final int MINIFI_PORT = 1337;
+ private static final String ENV_DATA = "ENV_DATA";
+
+ @Mock
+ private MiNiFiCommandSender miNiFiCommandSender;
+ @Mock
+ private CurrentPortProvider currentPortProvider;
+
+ @InjectMocks
+ private EnvRunner envRunner;
+
+ @Test
+ void testRunCommandShouldReturnNotRunningStatusCodeIfPortReturnsNull() {
+ when(currentPortProvider.getCurrentPort()).thenReturn(null);
+
+ int statusCode = envRunner.runCommand(new String[]{});
+
+ assertEquals(MINIFI_NOT_RUNNING.getStatusCode(), statusCode);
+ verifyNoMoreInteractions(currentPortProvider);
+ verifyNoInteractions(miNiFiCommandSender);
+ }
+
+ @Test
+ void testRunCommandShouldReturnErrorStatusCodeIfSendCommandThrowsException() throws IOException {
+ when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
+ when(miNiFiCommandSender.sendCommand(ENV_CMD, MINIFI_PORT)).thenThrow(new IOException());
+
+ int statusCode = envRunner.runCommand(new String[]{});
+
+ assertEquals(ERROR.getStatusCode(), statusCode);
+ verifyNoMoreInteractions(currentPortProvider, miNiFiCommandSender);
+ }
+
+ @Test
+ void testRunCommandShouldReturnOkStatusCode() throws IOException {
+ when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
+ when(miNiFiCommandSender.sendCommand(ENV_CMD, MINIFI_PORT)).thenReturn(Optional.of(ENV_DATA));
+
+ int statusCode = envRunner.runCommand(new String[]{});
+
+ assertEquals(OK.getStatusCode(), statusCode);
+ verifyNoMoreInteractions(currentPortProvider, miNiFiCommandSender);
+ }
+}
\ No newline at end of file
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/FlowStatusRunnerTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/FlowStatusRunnerTest.java
new file mode 100644
index 0000000000..04373106c1
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/FlowStatusRunnerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.command;
+
+import static org.apache.nifi.minifi.bootstrap.Status.ERROR;
+import static org.apache.nifi.minifi.bootstrap.Status.OK;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
+import org.apache.nifi.minifi.commons.status.FlowStatusReport;
+import org.apache.nifi.minifi.commons.status.instance.InstanceHealth;
+import org.apache.nifi.minifi.commons.status.instance.InstanceStatus;
+import org.apache.nifi.minifi.commons.status.processor.ProcessorStatusBean;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class FlowStatusRunnerTest {
+
+ protected static final String STATUS_REQUEST = "processor:TailFile:health,stats,bulletins";
+
+ @Mock
+ private PeriodicStatusReporterManager periodicStatusReporterManager;
+
+ @InjectMocks
+ private FlowStatusRunner flowStatusRunner;
+
+ @Test
+ void testRunCommandShouldReturnErrorCodeWhenArgsLengthIsNotTwo() {
+ int statusCode = flowStatusRunner.runCommand(new String[0]);
+
+ assertEquals(ERROR.getStatusCode(), statusCode);
+ verifyNoInteractions(periodicStatusReporterManager);
+ }
+
+ @Test
+ void testRunCommandShouldReturnOkCodeWhenArgsLengthIsTwo() {
+ FlowStatusReport flowStatusReport = aFlowStatusReport();
+ when(periodicStatusReporterManager.statusReport(STATUS_REQUEST)).thenReturn(flowStatusReport);
+
+ int statusCode = flowStatusRunner.runCommand(new String[] {"flowStatus", STATUS_REQUEST});
+
+ assertEquals(OK.getStatusCode(), statusCode);
+ }
+
+ private FlowStatusReport aFlowStatusReport() {
+ FlowStatusReport flowStatusReport = new FlowStatusReport();
+ InstanceStatus instanceStatus = new InstanceStatus();
+ InstanceHealth instanceHealth = new InstanceHealth();
+ instanceHealth.setQueuedCount(2);
+ instanceHealth.setActiveThreads(3);
+ instanceStatus.setInstanceHealth(instanceHealth);
+ flowStatusReport.setInstanceStatus(instanceStatus);
+ ProcessorStatusBean processorStatusBean = new ProcessorStatusBean();
+ processorStatusBean.setId("processorId");
+ flowStatusReport.setProcessorStatusList(Collections.singletonList(processorStatusBean));
+ return flowStatusReport;
+ }
+
+}
\ No newline at end of file
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/StatusRunnerTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/StatusRunnerTest.java
new file mode 100644
index 0000000000..b04565f67b
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/StatusRunnerTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.command;
+
+import static org.apache.nifi.minifi.bootstrap.Status.MINIFI_NOT_RESPONDING;
+import static org.apache.nifi.minifi.bootstrap.Status.MINIFI_NOT_RUNNING;
+import static org.apache.nifi.minifi.bootstrap.Status.OK;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
+import org.apache.nifi.minifi.bootstrap.MiNiFiStatus;
+import org.apache.nifi.minifi.bootstrap.service.MiNiFiStatusProvider;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class StatusRunnerTest {
+
+ private static final long MINIFI_PID = 1L;
+ private static final int MINIFI_PORT = 1337;
+
+ @Mock
+ private MiNiFiParameters miNiFiParameters;
+ @Mock
+ private MiNiFiStatusProvider miNiFiStatusProvider;
+
+ @InjectMocks
+ private StatusRunner statusRunner;
+
+ @BeforeEach
+ void setup() {
+ when(miNiFiParameters.getMinifiPid()).thenReturn(MINIFI_PID);
+ when(miNiFiParameters.getMiNiFiPort()).thenReturn(MINIFI_PORT);
+ }
+
+ @Test
+ void testRunCommandShouldReturnOkStatusIfMiNiFiIsRespondingToPing() {
+ when(miNiFiStatusProvider.getStatus(MINIFI_PORT, MINIFI_PID)).thenReturn(new MiNiFiStatus(MINIFI_PORT, MINIFI_PID, true, false));
+
+ int status = statusRunner.runCommand(new String[0]);
+
+ assertEquals(OK.getStatusCode(), status);
+ }
+
+ @Test
+ void testRunCommandShouldReturnMiNiFiNotRespondingStatusIfMiNiFiIsNotRespondingToPing() {
+ when(miNiFiStatusProvider.getStatus(MINIFI_PORT, MINIFI_PID)).thenReturn(new MiNiFiStatus(MINIFI_PORT, MINIFI_PID, false, true));
+
+ int status = statusRunner.runCommand(new String[0]);
+
+ assertEquals(MINIFI_NOT_RESPONDING.getStatusCode(), status);
+ }
+
+ @Test
+ void testRunCommandShouldReturnMiNiFiNotRunningStatusIfMiNiFiIsNotRespondingToPingAndProcessIsNotRunning() {
+ when(miNiFiStatusProvider.getStatus(MINIFI_PORT, MINIFI_PID)).thenReturn(new MiNiFiStatus(null, MINIFI_PID, false, false));
+
+ int status = statusRunner.runCommand(new String[0]);
+
+ assertEquals(MINIFI_NOT_RUNNING.getStatusCode(), status);
+ }
+
+ @Test
+ void testRunCommandShouldReturnMiNiFiNotRunningStatusIfMiNiFiIsNotRespondingToPingIfPortIsGivenButPidIsMissingAndNotRespondingToPing() {
+ when(miNiFiStatusProvider.getStatus(MINIFI_PORT, MINIFI_PID)).thenReturn(new MiNiFiStatus(MINIFI_PORT, null, false, false));
+
+ int status = statusRunner.runCommand(new String[0]);
+
+ assertEquals(MINIFI_NOT_RUNNING.getStatusCode(), status);
+ }
+
+ @Test
+ void testRunCommandShouldReturnMiNiFiNotRunningStatusIfPortAndPidIsGivenButNotRespondingToPingAndProcessIsNotRunning() {
+ when(miNiFiStatusProvider.getStatus(MINIFI_PORT, MINIFI_PID)).thenReturn(new MiNiFiStatus(MINIFI_PORT, MINIFI_PID, false, false));
+
+ int status = statusRunner.runCommand(new String[0]);
+
+ assertEquals(MINIFI_NOT_RUNNING.getStatusCode(), status);
+ }
+}
\ No newline at end of file
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/StopRunnerTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/StopRunnerTest.java
new file mode 100644
index 0000000000..9335970005
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/StopRunnerTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.command;
+
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.UNINITIALIZED;
+import static org.apache.nifi.minifi.bootstrap.Status.ERROR;
+import static org.apache.nifi.minifi.bootstrap.Status.MINIFI_NOT_RUNNING;
+import static org.apache.nifi.minifi.bootstrap.Status.OK;
+import static org.apache.nifi.minifi.bootstrap.command.StopRunner.SHUTDOWN_CMD;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
+import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider;
+import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
+import org.apache.nifi.minifi.bootstrap.service.GracefulShutdownParameterProvider;
+import org.apache.nifi.minifi.bootstrap.service.MiNiFiCommandSender;
+import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class StopRunnerTest {
+
+ private static final int MINIFI_PORT = 1337;
+ private static final long MINIFI_PID = 1;
+
+ @Mock
+ private BootstrapFileProvider bootstrapFileProvider;
+ @Mock
+ private MiNiFiParameters miNiFiParameters;
+ @Mock
+ private MiNiFiCommandSender miNiFiCommandSender;
+ @Mock
+ private CurrentPortProvider currentPortProvider;
+ @Mock
+ private GracefulShutdownParameterProvider gracefulShutdownParameterProvider;
+ @Mock
+ private ProcessUtils processUtils;
+
+ @InjectMocks
+ private StopRunner stopRunner;
+
+ @Test
+ void testRunCommandShouldReturnErrorStatusCodeInCaseOfException() {
+ when(currentPortProvider.getCurrentPort()).thenThrow(new RuntimeException());
+
+ int statusCode = stopRunner.runCommand(new String[0]);
+
+ assertEquals(ERROR.getStatusCode(), statusCode);
+ verifyNoInteractions(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, gracefulShutdownParameterProvider, processUtils);
+ }
+
+ @Test
+ void testRunCommandShouldReturnMiNiFiNotRunningStatusCodeInCaseMiNiFiPortIsNull() {
+ when(currentPortProvider.getCurrentPort()).thenReturn(null);
+
+ int statusCode = stopRunner.runCommand(new String[0]);
+
+ assertEquals(MINIFI_NOT_RUNNING.getStatusCode(), statusCode);
+ verifyNoInteractions(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, gracefulShutdownParameterProvider, processUtils);
+ }
+
+ @Test
+ void testRunCommandShouldCreateAndCleanupLockFileAfterExecution() throws IOException {
+ File lockFile = mock(File.class);
+ when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
+ when(bootstrapFileProvider.getLockFile()).thenReturn(lockFile);
+ when(lockFile.exists()).thenReturn(false, true);
+ when(lockFile.delete()).thenReturn(true);
+ when(miNiFiParameters.getMinifiPid()).thenReturn((long) UNINITIALIZED);
+ when(miNiFiCommandSender.sendCommand(SHUTDOWN_CMD, MINIFI_PORT)).thenReturn(Optional.of(SHUTDOWN_CMD));
+
+ int statusCode = stopRunner.runCommand(new String[0]);
+
+ assertEquals(OK.getStatusCode(), statusCode);
+ verify(lockFile).createNewFile();
+ verifyNoInteractions(gracefulShutdownParameterProvider, processUtils);
+ }
+
+ @Test
+ void testRunCommandShouldMessageBeLockedInCaseOfLockFileFailureIssue() throws IOException {
+ File lockFile = mock(File.class);
+ when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
+ when(bootstrapFileProvider.getLockFile()).thenReturn(lockFile);
+ when(lockFile.exists()).thenReturn(false, true);
+ when(lockFile.delete()).thenReturn(false);
+ when(miNiFiParameters.getMinifiPid()).thenReturn((long) UNINITIALIZED);
+ when(miNiFiCommandSender.sendCommand(SHUTDOWN_CMD, MINIFI_PORT)).thenReturn(Optional.of(SHUTDOWN_CMD));
+
+ int statusCode = stopRunner.runCommand(new String[0]);
+
+ assertEquals(OK.getStatusCode(), statusCode);
+ verify(lockFile).createNewFile();
+ verifyNoInteractions(gracefulShutdownParameterProvider, processUtils);
+ }
+
+ @Test
+ void testRunCommandShouldReturnErrorStatusCodeIfMiNiFiResponseIsNotShutdown() throws IOException {
+ File lockFile = mock(File.class);
+ when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
+ when(bootstrapFileProvider.getLockFile()).thenReturn(lockFile);
+ when(lockFile.exists()).thenReturn(true, false);
+ when(miNiFiParameters.getMinifiPid()).thenReturn((long) UNINITIALIZED);
+ String unknown = "unknown";
+ when(miNiFiCommandSender.sendCommand(SHUTDOWN_CMD, MINIFI_PORT)).thenReturn(Optional.of(unknown));
+
+ int statusCode = stopRunner.runCommand(new String[0]);
+
+ assertEquals(ERROR.getStatusCode(), statusCode);
+ verifyNoInteractions(gracefulShutdownParameterProvider, processUtils);
+ }
+
+ @Test
+ void testRunCommandShouldHandleExceptionalCaseIfProcessIdIsUnknown() throws IOException {
+ File lockFile = mock(File.class);
+ when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
+ when(bootstrapFileProvider.getLockFile()).thenReturn(lockFile);
+ when(lockFile.exists()).thenReturn(true, false);
+ when(miNiFiParameters.getMinifiPid()).thenReturn((long) UNINITIALIZED);
+ when(miNiFiCommandSender.sendCommand(SHUTDOWN_CMD, MINIFI_PORT)).thenThrow(new IOException());
+
+ int statusCode = stopRunner.runCommand(new String[0]);
+
+ assertEquals(OK.getStatusCode(), statusCode);
+ verifyNoInteractions(gracefulShutdownParameterProvider, processUtils);
+ }
+
+ @Test
+ void testRunCommandShouldHandleExceptionalCaseIfProcessIdIsGiven() throws IOException {
+ File lockFile = mock(File.class);
+ when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
+ when(bootstrapFileProvider.getLockFile()).thenReturn(lockFile);
+ when(lockFile.exists()).thenReturn(true, false);
+ when(miNiFiParameters.getMinifiPid()).thenReturn(MINIFI_PID);
+ when(miNiFiCommandSender.sendCommand(SHUTDOWN_CMD, MINIFI_PORT)).thenThrow(new IOException());
+
+ int statusCode = stopRunner.runCommand(new String[0]);
+
+ assertEquals(OK.getStatusCode(), statusCode);
+ verify(processUtils).killProcessTree(MINIFI_PID);
+ verifyNoInteractions(gracefulShutdownParameterProvider);
+ }
+
+ @Test
+ void testRunCommandShouldShutDownMiNiFiProcessGracefully() throws IOException {
+ File lockFile = mock(File.class);
+ File statusFile = mock(File.class);
+ File pidFile = mock(File.class);
+ int gracefulShutdownSeconds = 10;
+ when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
+ when(bootstrapFileProvider.getLockFile()).thenReturn(lockFile);
+ when(bootstrapFileProvider.getStatusFile()).thenReturn(statusFile);
+ when(bootstrapFileProvider.getPidFile()).thenReturn(pidFile);
+ when(lockFile.exists()).thenReturn(true, false);
+ when(statusFile.exists()).thenReturn(true);
+ when(pidFile.exists()).thenReturn(true);
+ when(statusFile.delete()).thenReturn(true);
+ when(pidFile.delete()).thenReturn(true);
+ when(miNiFiParameters.getMinifiPid()).thenReturn(MINIFI_PID);
+ when(miNiFiCommandSender.sendCommand(SHUTDOWN_CMD, MINIFI_PORT)).thenReturn(Optional.of(SHUTDOWN_CMD));
+ when(gracefulShutdownParameterProvider.getGracefulShutdownSeconds()).thenReturn(gracefulShutdownSeconds);
+
+ int statusCode = stopRunner.runCommand(new String[0]);
+
+ assertEquals(OK.getStatusCode(), statusCode);
+ verify(statusFile).delete();
+ verify(pidFile).delete();
+ verify(processUtils).shutdownProcess(eq(MINIFI_PID), anyString(), eq(gracefulShutdownSeconds));
+ }
+}
\ No newline at end of file
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestorTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestorTest.java
index 872bd0b7d0..0fbdb03648 100644
--- a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestorTest.java
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestorTest.java
@@ -17,11 +17,11 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -30,9 +30,10 @@ import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Collections;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
-import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -46,7 +47,7 @@ public class FileChangeIngestorTest {
private FileChangeIngestor notifierSpy;
private WatchService mockWatchService;
private Properties testProperties;
- private Differentiator<InputStream> mockDifferentiator;
+ private Differentiator<ByteBuffer> mockDifferentiator;
private ConfigurationChangeNotifier testNotifier;
@BeforeEach
@@ -56,13 +57,12 @@ public class FileChangeIngestorTest {
mockDifferentiator = Mockito.mock(Differentiator.class);
testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
- notifierSpy.setConfigFilePath(Paths.get(TEST_CONFIG_PATH));
- notifierSpy.setWatchService(mockWatchService);
- notifierSpy.setDifferentiator(mockDifferentiator);
- notifierSpy.setConfigurationChangeNotifier(testNotifier);
+ setMocks();
testProperties = new Properties();
testProperties.put(FileChangeIngestor.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH);
+ testProperties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
+ testProperties.put(PULL_HTTP_BASE_KEY + ".override.core", "true");
testProperties.put(FileChangeIngestor.POLLING_PERIOD_INTERVAL_KEY, FileChangeIngestor.DEFAULT_POLLING_PERIOD_INTERVAL);
}
@@ -97,7 +97,7 @@ public class FileChangeIngestorTest {
/* Verify handleChange events */
@Test
public void testTargetChangedNoModification() throws Exception {
- when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(false);
+ when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(false);
final ConfigurationChangeNotifier testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
// In this case the WatchKey is null because there were no events found
@@ -108,7 +108,7 @@ public class FileChangeIngestorTest {
@Test
public void testTargetChangedWithModificationEventNonConfigFile() throws Exception {
- when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(false);
+ when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(false);
final ConfigurationChangeNotifier testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
// In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored
@@ -123,12 +123,17 @@ public class FileChangeIngestorTest {
@Test
public void testTargetChangedWithModificationEvent() throws Exception {
- when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(true);
+ when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME);
// Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class
establishMockEnvironmentForChangeTests(mockWatchKey);
+ ConfigurationFileHolder configurationFileHolder = Mockito.mock(ConfigurationFileHolder.class);
+ when(configurationFileHolder.getConfigFileReference()).thenReturn(new AtomicReference<>(ByteBuffer.wrap(new byte[0])));
+ notifierSpy.initialize(testProperties, configurationFileHolder, testNotifier);
+ setMocks();
+
// Invoke the method of interest
notifierSpy.run();
@@ -160,4 +165,11 @@ public class FileChangeIngestorTest {
when(mockWatchService.poll()).thenReturn(watchKey);
}
+
+ private void setMocks() {
+ notifierSpy.setConfigFilePath(Paths.get(TEST_CONFIG_PATH));
+ notifierSpy.setWatchService(mockWatchService);
+ notifierSpy.setDifferentiator(mockDifferentiator);
+ notifierSpy.setConfigurationChangeNotifier(testNotifier);
+ }
}
\ No newline at end of file
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java
index 1a4df6ff5c..3daa372d05 100644
--- a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java
@@ -17,15 +17,24 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.configuration.ingestors.common.PullHttpChangeIngestorCommonTest;
+import org.apache.nifi.minifi.commons.schema.ConfigSchema;
+import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException;
+import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.jupiter.api.BeforeAll;
import org.mockito.Mockito;
-import java.util.Properties;
-
public class PullHttpChangeIngestorSSLTest extends PullHttpChangeIngestorCommonTest {
@BeforeAll
@@ -65,7 +74,7 @@ public class PullHttpChangeIngestorSSLTest extends PullHttpChangeIngestorCommonT
}
@Override
- public void pullHttpChangeIngestorInit(Properties properties) {
+ public void pullHttpChangeIngestorInit(Properties properties) throws IOException, SchemaLoaderException {
properties.setProperty(PullHttpChangeIngestor.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
properties.setProperty(PullHttpChangeIngestor.TRUSTSTORE_PASSWORD_KEY, "localtest");
properties.setProperty(PullHttpChangeIngestor.TRUSTSTORE_TYPE_KEY, "JKS");
@@ -76,10 +85,18 @@ public class PullHttpChangeIngestorSSLTest extends PullHttpChangeIngestorCommonT
properties.put(PullHttpChangeIngestor.PORT_KEY, String.valueOf(port));
properties.put(PullHttpChangeIngestor.HOST_KEY, "localhost");
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
+ properties.put(PULL_HTTP_BASE_KEY + ".override.core", "true");
+ ConfigurationFileHolder configurationFileHolder = Mockito.mock(ConfigurationFileHolder.class);
+
+ ConfigSchema configSchema =
+ SchemaLoader.loadConfigSchemaFromYaml(PullHttpChangeIngestorSSLTest.class.getClassLoader().getResourceAsStream("config.yml"));
+ StringWriter writer = new StringWriter();
+ SchemaLoader.toYaml(configSchema, writer);
+ when(configurationFileHolder.getConfigFileReference()).thenReturn(new AtomicReference<>(ByteBuffer.wrap(writer.toString().getBytes())));
pullHttpChangeIngestor = new PullHttpChangeIngestor();
- pullHttpChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
+ pullHttpChangeIngestor.initialize(properties, configurationFileHolder, testNotifier);
pullHttpChangeIngestor.setDifferentiator(mockDifferentiator);
}
}
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java
index a3f05a5b48..3833ea1425 100644
--- a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java
@@ -17,14 +17,23 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.configuration.ingestors.common.PullHttpChangeIngestorCommonTest;
+import org.apache.nifi.minifi.commons.schema.ConfigSchema;
+import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException;
+import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
import org.eclipse.jetty.server.ServerConnector;
import org.junit.jupiter.api.BeforeAll;
import org.mockito.Mockito;
-import java.util.Properties;
-
public class PullHttpChangeIngestorTest extends PullHttpChangeIngestorCommonTest {
@BeforeAll
@@ -50,14 +59,22 @@ public class PullHttpChangeIngestorTest extends PullHttpChangeIngestorCommonTest
@Override
- public void pullHttpChangeIngestorInit(Properties properties) {
+ public void pullHttpChangeIngestorInit(Properties properties) throws IOException, SchemaLoaderException {
port = ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
properties.put(PullHttpChangeIngestor.PORT_KEY, String.valueOf(port));
properties.put(PullHttpChangeIngestor.HOST_KEY, "localhost");
properties.put(PullHttpChangeIngestor.PULL_HTTP_POLLING_PERIOD_KEY, "30000");
+ properties.put(PULL_HTTP_BASE_KEY + ".override.core", "true");
+ ConfigurationFileHolder configurationFileHolder = Mockito.mock(ConfigurationFileHolder.class);
+
+ ConfigSchema configSchema =
+ SchemaLoader.loadConfigSchemaFromYaml(PullHttpChangeIngestorTest.class.getClassLoader().getResourceAsStream("config.yml"));
+ StringWriter writer = new StringWriter();
+ SchemaLoader.toYaml(configSchema, writer);
+ when(configurationFileHolder.getConfigFileReference()).thenReturn(new AtomicReference<>(ByteBuffer.wrap(writer.toString().getBytes())));
pullHttpChangeIngestor = new PullHttpChangeIngestor();
- pullHttpChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
+ pullHttpChangeIngestor.initialize(properties, configurationFileHolder, testNotifier);
pullHttpChangeIngestor.setDifferentiator(mockDifferentiator);
}
}
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestorSSLTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestorSSLTest.java
index 8f3edd1a64..91a56642e9 100644
--- a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestorSSLTest.java
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestorSSLTest.java
@@ -17,6 +17,8 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicReference;
import okhttp3.OkHttpClient;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
@@ -44,6 +46,7 @@ import java.security.cert.CertificateException;
import java.util.Collections;
import java.util.Properties;
+import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
import static org.mockito.Mockito.when;
public class RestChangeIngestorSSLTest extends RestChangeIngestorCommonTest {
@@ -58,6 +61,8 @@ public class RestChangeIngestorSSLTest extends RestChangeIngestorCommonTest {
properties.setProperty(RestChangeIngestor.KEYSTORE_PASSWORD_KEY, "localtest");
properties.setProperty(RestChangeIngestor.KEYSTORE_TYPE_KEY, "JKS");
properties.setProperty(RestChangeIngestor.NEED_CLIENT_AUTH_KEY, "false");
+ properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
+ properties.put(PULL_HTTP_BASE_KEY + ".override.core", "true");
restChangeIngestor = new RestChangeIngestor();
@@ -66,7 +71,10 @@ public class RestChangeIngestorSSLTest extends RestChangeIngestorCommonTest {
when(testListener.getDescriptor()).thenReturn("MockChangeListener");
when(testNotifier.notifyListeners(Mockito.any())).thenReturn(Collections.singleton(new ListenerHandleResult(testListener)));
- restChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
+ ConfigurationFileHolder configurationFileHolder = Mockito.mock(ConfigurationFileHolder.class);
+ when(configurationFileHolder.getConfigFileReference()).thenReturn(new AtomicReference<>(ByteBuffer.wrap(new byte[0])));
+
+ restChangeIngestor.initialize(properties, configurationFileHolder, testNotifier);
restChangeIngestor.setDifferentiator(mockDifferentiator);
restChangeIngestor.start();
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestorTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestorTest.java
index cfa934f21c..65a61cb6c5 100644
--- a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestorTest.java
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestorTest.java
@@ -17,6 +17,11 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicReference;
import okhttp3.OkHttpClient;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
@@ -33,11 +38,16 @@ public class RestChangeIngestorTest extends RestChangeIngestorCommonTest {
@BeforeAll
public static void setUp() throws InterruptedException, MalformedURLException {
Properties properties = new Properties();
+ properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
+ properties.put(PULL_HTTP_BASE_KEY + ".override.core", "true");
restChangeIngestor = new RestChangeIngestor();
testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
- restChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
+ ConfigurationFileHolder configurationFileHolder = Mockito.mock(ConfigurationFileHolder.class);
+ when(configurationFileHolder.getConfigFileReference()).thenReturn(new AtomicReference<>(ByteBuffer.wrap(new byte[0])));
+
+ restChangeIngestor.initialize(properties, configurationFileHolder, testNotifier);
restChangeIngestor.setDifferentiator(mockDifferentiator);
restChangeIngestor.start();
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java
index 9b1d33e2f6..dfc0c69531 100644
--- a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java
@@ -17,11 +17,26 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors.common;
-import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
+import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PATH_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Properties;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor;
import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
import org.apache.nifi.minifi.commons.schema.ConfigSchema;
@@ -40,23 +55,6 @@ import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Properties;
-
-import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PATH_KEY;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
public abstract class PullHttpChangeIngestorCommonTest {
public static volatile Server jetty;
@@ -81,7 +79,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
jetty.setHandler(handlerCollection);
}
- public abstract void pullHttpChangeIngestorInit(Properties properties);
+ public abstract void pullHttpChangeIngestorInit(Properties properties) throws IOException, SchemaLoaderException;
@BeforeEach
public void setListeners() {
@@ -97,7 +95,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
}
@Test
- public void testNewUpdate() throws IOException {
+ public void testNewUpdate() throws IOException, SchemaLoaderException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
pullHttpChangeIngestorInit(properties);
@@ -131,7 +129,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
}
@Test
- public void testNoUpdate() throws IOException {
+ public void testNoUpdate() throws IOException, SchemaLoaderException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
pullHttpChangeIngestorInit(properties);
@@ -144,7 +142,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
}
@Test
- public void testUseEtag() throws IOException {
+ public void testUseEtag() throws IOException, SchemaLoaderException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
pullHttpChangeIngestorInit(properties);
@@ -165,7 +163,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
}
@Test
- public void testNewUpdateWithPath() throws IOException {
+ public void testNewUpdateWithPath() throws IOException, SchemaLoaderException {
Properties properties = new Properties();
properties.put(PATH_KEY, "/config.yml");
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
@@ -179,7 +177,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
}
@Test
- public void testNoUpdateWithPath() throws IOException {
+ public void testNoUpdateWithPath() throws IOException, SchemaLoaderException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
properties.put(PATH_KEY, "/config.yml");
@@ -193,7 +191,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
}
@Test
- public void testUseEtagWithPath() throws IOException {
+ public void testUseEtagWithPath() throws IOException, SchemaLoaderException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
properties.put(PATH_KEY, "/config.yml");
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/RestChangeIngestorCommonTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/RestChangeIngestorCommonTest.java
index 9a5dd0b0be..50e4f21f03 100644
--- a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/RestChangeIngestorCommonTest.java
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/RestChangeIngestorCommonTest.java
@@ -17,6 +17,13 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors.common;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
@@ -32,15 +39,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
public abstract class RestChangeIngestorCommonTest {
private static final String testString = "This is a test string.";
@@ -50,7 +48,7 @@ public abstract class RestChangeIngestorCommonTest {
public static final MediaType MEDIA_TYPE_MARKDOWN = MediaType.parse("text/x-markdown; charset=utf-8");
public static String url;
public static ConfigurationChangeNotifier testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
- public static Differentiator<InputStream> mockDifferentiator = Mockito.mock(Differentiator.class);
+ public static Differentiator<ByteBuffer> mockDifferentiator = Mockito.mock(Differentiator.class);
@BeforeEach
public void setListener() {
@@ -81,7 +79,7 @@ public abstract class RestChangeIngestorCommonTest {
@Test
public void testFileUploadNewConfig() throws Exception {
- when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(true);
+ when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
Request request = new Request.Builder()
.url(url)
@@ -105,7 +103,7 @@ public abstract class RestChangeIngestorCommonTest {
@Test
public void testFileUploadSameConfig() throws Exception {
- when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(false);
+ when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(false);
Request request = new Request.Builder()
.url(url)
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodecTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodecTest.java
new file mode 100644
index 0000000000..ccbe6b40a7
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodecTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.stream.Stream;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+class BootstrapCodecTest {
+
+ private static final int VALID_PORT = 1;
+ private static final String SECRET = "secret";
+ private static final String OK = "OK";
+ private static final String EMPTY_STRING = "";
+ private RunMiNiFi runner;
+
+ @BeforeEach
+ void setup() {
+ runner = mock(RunMiNiFi.class);
+ }
+
+ @Test
+ void testCommunicateShouldThrowIOExceptionIfThereIsNoCommand() {
+ InputStream inputStream = new ByteArrayInputStream(new byte[0]);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
+
+ assertThrows(IOException.class, bootstrapCodec::communicate);
+ assertEquals(EMPTY_STRING, outputStream.toString().trim());
+ verifyNoInteractions(runner);
+ }
+
+ @Test
+ void testCommunicateShouldInvalidCommandThrowIoException() {
+ String unknown = "unknown";
+ InputStream inputStream = new ByteArrayInputStream(unknown.getBytes(StandardCharsets.UTF_8));
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
+
+ assertThrows(IOException.class, bootstrapCodec::communicate);
+ assertEquals(EMPTY_STRING, outputStream.toString().trim());
+ verifyNoInteractions(runner);
+ }
+
+ @Test
+ void testCommunicateShouldSetMiNiFiParametersAndWriteOk() throws IOException {
+ String command = "PORT " + VALID_PORT + " " + SECRET;
+ InputStream inputStream = new ByteArrayInputStream(command.getBytes(StandardCharsets.UTF_8));
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
+
+ bootstrapCodec.communicate();
+
+ verify(runner).setMiNiFiParameters(VALID_PORT, SECRET);
+ assertEquals(OK, outputStream.toString().trim());
+ }
+
+ @ParameterizedTest(name = "{index} => command={0}, expectedExceptionMessage={1}")
+ @MethodSource("portCommandValidationInputs")
+ void testCommunicateShouldFailWhenReceivesPortCommand(String command) {
+ InputStream inputStream = new ByteArrayInputStream(command.getBytes(StandardCharsets.UTF_8));
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
+
+ assertThrows(IOException.class, bootstrapCodec::communicate);
+ assertEquals(EMPTY_STRING, outputStream.toString().trim());
+ verifyNoInteractions(runner);
+ }
+
+ private static Stream<Arguments> portCommandValidationInputs() {
+ return Stream.of(
+ Arguments.of("PORT"),
+ Arguments.of("PORT invalid secretKey"),
+ Arguments.of("PORT 0 secretKey")
+ );
+ }
+
+ @Test
+ void testCommunicateShouldFailIfStartedCommandHasOtherThanOneArg() {
+ InputStream inputStream = new ByteArrayInputStream("STARTED".getBytes(StandardCharsets.UTF_8));
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
+
+ assertThrows(IOException.class, bootstrapCodec::communicate);
+ assertEquals(EMPTY_STRING, outputStream.toString().trim());
+ verifyNoInteractions(runner);
+ }
+
+ @Test
+ void testCommunicateShouldFailIfStartedCommandFirstArgIsNotBoolean() {
+ InputStream inputStream = new ByteArrayInputStream("STARTED yes".getBytes(StandardCharsets.UTF_8));
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
+
+ assertThrows(IOException.class, bootstrapCodec::communicate);
+ assertEquals(EMPTY_STRING, outputStream.toString().trim());
+ verifyNoInteractions(runner);
+ }
+
+ @Test
+ void testCommunicateShouldHandleStartedCommand() throws IOException {
+ InputStream inputStream = new ByteArrayInputStream("STARTED true".getBytes(StandardCharsets.UTF_8));
+ PeriodicStatusReporterManager periodicStatusReporterManager = mock(PeriodicStatusReporterManager.class);
+ ConfigurationChangeCoordinator configurationChangeCoordinator = mock(ConfigurationChangeCoordinator.class);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
+ when(runner.getPeriodicStatusReporterManager()).thenReturn(periodicStatusReporterManager);
+ when(runner.getConfigurationChangeCoordinator()).thenReturn(configurationChangeCoordinator);
+
+ bootstrapCodec.communicate();
+
+ assertEquals(OK, outputStream.toString().trim());
+ verify(runner, times(2)).getPeriodicStatusReporterManager();
+ verify(periodicStatusReporterManager).shutdownPeriodicStatusReporters();
+ verify(periodicStatusReporterManager).startPeriodicNotifiers();
+ verify(runner).getConfigurationChangeCoordinator();
+ verify(configurationChangeCoordinator).start();
+ verify(runner).setNiFiStarted(true);
+ }
+
+ @Test
+ void testCommunicateShouldHandleShutdownCommand() throws IOException {
+ InputStream inputStream = new ByteArrayInputStream("SHUTDOWN".getBytes(StandardCharsets.UTF_8));
+
+ PeriodicStatusReporterManager periodicStatusReporterManager = mock(PeriodicStatusReporterManager.class);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
+ when(runner.getPeriodicStatusReporterManager()).thenReturn(periodicStatusReporterManager);
+
+ bootstrapCodec.communicate();
+
+ assertEquals(OK, outputStream.toString().trim());
+ verify(runner).getPeriodicStatusReporterManager();
+ verify(runner).shutdownChangeNotifier();
+ verify(periodicStatusReporterManager).shutdownPeriodicStatusReporters();
+ }
+
+ @Test
+ void testCommunicateShouldHandleReloadCommand() throws IOException {
+ InputStream inputStream = new ByteArrayInputStream("RELOAD".getBytes(StandardCharsets.UTF_8));
+
+ PeriodicStatusReporterManager periodicStatusReporterManager = mock(PeriodicStatusReporterManager.class);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
+ when(runner.getPeriodicStatusReporterManager()).thenReturn(periodicStatusReporterManager);
+
+ bootstrapCodec.communicate();
+
+ assertEquals(OK, outputStream.toString().trim());
+ }
+}
\ No newline at end of file
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/CurrentPortProviderTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/CurrentPortProviderTest.java
new file mode 100644
index 0000000000..1af256dc7d
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/CurrentPortProviderTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.UNINITIALIZED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
+import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class CurrentPortProviderTest {
+
+ private static final int PORT = 1;
+ private static final long PID = 1L;
+
+ @Mock
+ private MiNiFiCommandSender miNiFiCommandSender;
+ @Mock
+ private MiNiFiParameters miNiFiParameters;
+ @Mock
+ private ProcessUtils processUtils;
+
+ @InjectMocks
+ private CurrentPortProvider currentPortProvider;
+
+ @Test
+ void testGetCurrentPortShouldReturnNullIfMiNiFiPortIsNotSet() {
+ when(miNiFiParameters.getMiNiFiPort()).thenReturn(UNINITIALIZED);
+
+ assertNull(currentPortProvider.getCurrentPort());
+ verifyNoInteractions(miNiFiCommandSender, processUtils);
+ }
+
+ @Test
+ void testGetCurrentPortShouldReturnNullIfPingIsNotSuccessfulAndProcessIsNotRunning() {
+ when(miNiFiParameters.getMiNiFiPort()).thenReturn(PORT);
+ when(miNiFiCommandSender.isPingSuccessful(PORT)).thenReturn(false);
+ when(miNiFiParameters.getMinifiPid()).thenReturn(PID);
+ when(processUtils.isProcessRunning(PID)).thenReturn(false);
+
+ assertNull(currentPortProvider.getCurrentPort());
+ }
+
+ @Test
+ void testGetCurrentPortShouldReturnPortIfPingIsSuccessful() {
+ when(miNiFiParameters.getMiNiFiPort()).thenReturn(PORT);
+ when(miNiFiCommandSender.isPingSuccessful(PORT)).thenReturn(true);
+
+ Integer currentPort = currentPortProvider.getCurrentPort();
+
+ assertEquals(PORT, currentPort);
+ verifyNoInteractions(processUtils);
+ verifyNoMoreInteractions(miNiFiParameters);
+ }
+
+ @Test
+ void testGetCurrentPortShouldReturnPortIfPingIsNotSuccessfulButPidIsRunning() {
+ when(miNiFiParameters.getMiNiFiPort()).thenReturn(PORT);
+ when(miNiFiCommandSender.isPingSuccessful(PORT)).thenReturn(false);
+ when(miNiFiParameters.getMinifiPid()).thenReturn(PID);
+ when(processUtils.isProcessRunning(PID)).thenReturn(true);
+
+ Integer currentPort = currentPortProvider.getCurrentPort();
+
+ assertEquals(PORT, currentPort);
+ }
+}
\ No newline at end of file
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/GracefulShutdownParameterProviderTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/GracefulShutdownParameterProviderTest.java
new file mode 100644
index 0000000000..87f3ecd810
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/GracefulShutdownParameterProviderTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static org.apache.nifi.minifi.bootstrap.service.GracefulShutdownParameterProvider.DEFAULT_GRACEFUL_SHUTDOWN_VALUE;
+import static org.apache.nifi.minifi.bootstrap.service.GracefulShutdownParameterProvider.GRACEFUL_SHUTDOWN_PROP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.NullSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class GracefulShutdownParameterProviderTest {
+
+ @Mock
+ private BootstrapFileProvider bootstrapFileProvider;
+
+ @InjectMocks
+ private GracefulShutdownParameterProvider gracefulShutdownParameterProvider;
+
+ @ParameterizedTest(name = "{index} => shutdownPropertyValue={0}")
+ @NullSource
+ @ValueSource(strings = {"notAnInteger", "-1"})
+ void testGetBootstrapPropertiesShouldReturnDefaultShutdownPropertyValue(String shutdownProperty) throws IOException {
+ Properties properties = new Properties();
+ if (shutdownProperty != null) {
+ properties.setProperty(GRACEFUL_SHUTDOWN_PROP, shutdownProperty);
+ }
+ when(bootstrapFileProvider.getBootstrapProperties()).thenReturn(properties);
+
+ assertEquals(Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE), gracefulShutdownParameterProvider.getGracefulShutdownSeconds());
+ }
+
+ @Test
+ void testGetBootstrapPropertiesShouldReturnShutdownPropertyValue() throws IOException {
+ Properties properties = new Properties();
+ properties.setProperty(GRACEFUL_SHUTDOWN_PROP, "1000");
+ when(bootstrapFileProvider.getBootstrapProperties()).thenReturn(properties);
+
+ assertEquals(1000, gracefulShutdownParameterProvider.getGracefulShutdownSeconds());
+ }
+
+}
\ No newline at end of file
diff --git a/minifi/minifi-integration-tests/src/test/java/org/apache/nifi/minifi/integration/c2/HierarchicalC2IntegrationTest.java b/minifi/minifi-integration-tests/src/test/java/org/apache/nifi/minifi/integration/c2/HierarchicalC2IntegrationTest.java
index 3c7e5e89a7..23d7083b8c 100644
--- a/minifi/minifi-integration-tests/src/test/java/org/apache/nifi/minifi/integration/c2/HierarchicalC2IntegrationTest.java
+++ b/minifi/minifi-integration-tests/src/test/java/org/apache/nifi/minifi/integration/c2/HierarchicalC2IntegrationTest.java
@@ -97,8 +97,8 @@ public class HierarchicalC2IntegrationTest {
certificatesDirectory.resolve("c2-authoritative").resolve("truststore.jks").toFile().getAbsolutePath(),
"badTrustPass",
KeystoreType.JKS);
- healthCheckSocketFactory = trustSslContext.getSocketFactory();
trustSslContext = SslContextFactory.createSslContext(tlsConfiguration);
+ healthCheckSocketFactory = trustSslContext.getSocketFactory();
docker.before();
}
diff --git a/minifi/minifi-integration-tests/src/test/java/org/apache/nifi/minifi/integration/standalone/test/StandaloneXmlTest.java b/minifi/minifi-integration-tests/src/test/java/org/apache/nifi/minifi/integration/standalone/test/StandaloneXmlTest.java
index cbb6dfd29f..34f5452c31 100644
--- a/minifi/minifi-integration-tests/src/test/java/org/apache/nifi/minifi/integration/standalone/test/StandaloneXmlTest.java
+++ b/minifi/minifi-integration-tests/src/test/java/org/apache/nifi/minifi/integration/standalone/test/StandaloneXmlTest.java
@@ -28,7 +28,6 @@ import java.nio.file.Paths;
public class StandaloneXmlTest extends StandaloneYamlTest {
public void setDocker(String version, String name) throws Exception {
- super.setDocker(version, name);
ConfigSchema configSchema;
try (InputStream inputStream = StandaloneXmlTest.class.getClassLoader().getResourceAsStream("./standalone/" + version + "/" + name + "/xml/" + name + ".xml")) {
configSchema = ConfigMain.transformTemplateToSchema(inputStream);
@@ -37,6 +36,7 @@ public class StandaloneXmlTest extends StandaloneYamlTest {
.getParent().toAbsolutePath().resolve(getConfigYml(version, name)))) {
SchemaSaver.saveConfigSchema(configSchema, outputStream);
}
+ super.setDocker(version, name);
}
@Override
diff --git a/minifi/minifi-integration-tests/src/test/resources/conf/nifi.properties b/minifi/minifi-integration-tests/src/test/resources/conf/nifi.properties
new file mode 100644
index 0000000000..e23c5b0d09
--- /dev/null
+++ b/minifi/minifi-integration-tests/src/test/resources/conf/nifi.properties
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Core Properties #
+nifi.flow.configuration.file=./target/flow.xml.gz
+nifi.flow.configuration.archive.dir=./target/archive/
+nifi.flowcontroller.autoResumeState=true
+nifi.flowcontroller.graceful.shutdown.period=10 sec
+nifi.flowservice.writedelay.interval=2 sec
+nifi.administrative.yield.duration=30 sec
+
+nifi.reporting.task.configuration.file=./target/reporting-tasks.xml
+nifi.controller.service.configuration.file=./target/controller-services.xml
+nifi.templates.directory=./target/templates
+nifi.ui.banner.text=UI Banner Text
+nifi.ui.autorefresh.interval=30 sec
+nifi.nar.library.directory=./target/resources/NiFiProperties/lib/
+nifi.nar.library.directory.alt=./target/resources/NiFiProperties/lib2/
+nifi.nar.working.directory=./target/work/nar/
+
+# H2 Settings
+nifi.database.directory=./target/database_repository
+nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
+
+# FlowFile Repository
+nifi.flowfile.repository.directory=./target/test-repo
+nifi.flowfile.repository.partitions=1
+nifi.flowfile.repository.checkpoint.interval=2 mins
+nifi.queue.swap.threshold=20000
+nifi.swap.storage.directory=./target/test-repo/swap
+nifi.swap.in.period=5 sec
+nifi.swap.in.threads=1
+nifi.swap.out.period=5 sec
+nifi.swap.out.threads=4
+
+# Content Repository
+nifi.content.claim.max.appendable.size=10 MB
+nifi.content.claim.max.flow.files=100
+nifi.content.repository.directory.default=./target/content_repository
+
+# Provenance Repository Properties
+nifi.provenance.repository.storage.directory=./target/provenance_repository
+nifi.provenance.repository.max.storage.time=24 hours
+nifi.provenance.repository.max.storage.size=1 GB
+nifi.provenance.repository.rollover.time=30 secs
+nifi.provenance.repository.rollover.size=100 MB
+
+# Site to Site properties
+nifi.remote.input.socket.port=9990
+nifi.remote.input.secure=true
+
+# web properties #
+nifi.web.war.directory=./target/lib
+nifi.web.http.host=
+nifi.web.http.port=8080
+nifi.web.https.host=
+nifi.web.https.port=
+nifi.web.jetty.working.directory=./target/work/jetty
+
+# security properties #
+nifi.sensitive.props.key=key
+nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL
+
+nifi.security.keystore=
+nifi.security.keystoreType=
+nifi.security.keystorePasswd=
+nifi.security.keyPasswd=
+nifi.security.truststore=
+nifi.security.truststoreType=
+nifi.security.truststorePasswd=
+nifi.security.user.authorizer=
+
+# cluster common properties (cluster manager and nodes must have same values) #
+nifi.cluster.protocol.heartbeat.interval=5 sec
+nifi.cluster.protocol.is.secure=false
+nifi.cluster.protocol.socket.timeout=30 sec
+nifi.cluster.protocol.connection.handshake.timeout=45 sec
+# if multicast is used, then nifi.cluster.protocol.multicast.xxx properties must be configured #
+nifi.cluster.protocol.use.multicast=false
+nifi.cluster.protocol.multicast.address=
+nifi.cluster.protocol.multicast.port=
+nifi.cluster.protocol.multicast.service.broadcast.delay=500 ms
+nifi.cluster.protocol.multicast.service.locator.attempts=3
+nifi.cluster.protocol.multicast.service.locator.attempts.delay=1 sec
+
+# cluster node properties (only configure for cluster nodes) #
+nifi.cluster.is.node=false
+nifi.cluster.node.address=
+nifi.cluster.node.protocol.port=
+nifi.cluster.node.protocol.threads=2
+# if multicast is not used, nifi.cluster.node.unicast.xxx must have same values as nifi.cluster.manager.xxx #
+nifi.cluster.node.unicast.manager.address=
+nifi.cluster.node.unicast.manager.protocol.port=
+nifi.cluster.node.unicast.manager.authority.provider.port=
+
+# cluster manager properties (only configure for cluster manager) #
+nifi.cluster.is.manager=false
+nifi.cluster.manager.address=
+nifi.cluster.manager.protocol.port=
+nifi.cluster.manager.authority.provider.port=
+nifi.cluster.manager.authority.provider.threads=10
+nifi.cluster.manager.node.firewall.file=
+nifi.cluster.manager.node.event.history.size=10
+nifi.cluster.manager.node.api.connection.timeout=30 sec
+nifi.cluster.manager.node.api.read.timeout=30 sec
+nifi.cluster.manager.node.api.request.threads=10
+nifi.cluster.manager.flow.retrieval.delay=5 sec
+nifi.cluster.manager.protocol.threads=10
+nifi.cluster.manager.safemode.duration=0 sec
+
+# analytics properties #
+nifi.analytics.predict.interval=3 mins
+nifi.analytics.connection.model.implementation=org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares
\ No newline at end of file
diff --git a/minifi/minifi-integration-tests/src/test/resources/logback.xml b/minifi/minifi-integration-tests/src/test/resources/logback.xml
index b0baaf3519..2c8860e536 100644
--- a/minifi/minifi-integration-tests/src/test/resources/logback.xml
+++ b/minifi/minifi-integration-tests/src/test/resources/logback.xml
@@ -58,7 +58,7 @@
<logger name="org.apache.nifi" level="INFO"/>
<logger name="org.apache.nifi.processors" level="WARN"/>
<logger name="org.apache.nifi.processors.standard.LogAttribute" level="INFO"/>
- <logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="INFO" />
+ <logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="DEBUG" />
<logger name="org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor" level="DEBUG" />
<logger name="org.apache.nifi.minifi.c2.cache.filesystem.FileSystemConfigurationCache" level="DEBUG" />
@@ -88,7 +88,7 @@
<logger name="org.apache.nifi.minifi.StdOut" level="INFO" additivity="false">
<appender-ref ref="BOOTSTRAP_FILE" />
</logger>
-
+
<!-- Everything written to MiNiFi's Standard Error will be logged with the logger org.apache.nifi.minifi.StdErr at ERROR level -->
<logger name="org.apache.nifi.minifi.StdErr" level="ERROR" additivity="false">
<appender-ref ref="BOOTSTRAP_FILE" />
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 5c99617e7d..7aefec57e8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -599,7 +599,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
final long flowFileRepoUpdateFinishNanos = System.nanoTime();
final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - flowFileRepoUpdateStart;
- if (LOG.isInfoEnabled()) {
+ if (LOG.isDebugEnabled()) {
for (final RepositoryRecord record : checkpoint.records.values()) {
if (record.isMarkedForAbort()) {
final FlowFileRecord flowFile = record.getCurrent();
@@ -651,7 +651,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
}
checkpoint.deleteOnCommit.clear();
- if (LOG.isInfoEnabled()) {
+ if (LOG.isDebugEnabled()) {
final String sessionSummary = summarizeEvents(checkpoint);
if (!sessionSummary.isEmpty()) {
LOG.debug("{} for {}, committed the following events: {}", this, connectableDescription, sessionSummary);