You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2019/08/13 12:46:50 UTC

[nifi-minifi-cpp] branch master updated: minificpp-34 Implemented.

This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 11cefc8  minificpp-34 Implemented.
11cefc8 is described below

commit 11cefc8342aefcd843d48faeef01e025db8c5b9f
Author: amarmer <am...@AMARMER-5530-85>
AuthorDate: Wed Aug 7 13:43:07 2019 -0700

    minificpp-34 Implemented.
    
    minificpp-34 Formatting.
    
    minificpp-34 Removed unused variable.
    
    minificpp-34 1. Increased wait time for exe termination to 5sec. 2. Small refactoring.
    
    minificpp-34 Small refactoring.
    
    minificpp-34 Small refactoring.
    
    minificpp-34 Refactoring.
    
    minificpp-34 No changes, to re-trigger appveyor.
    
    This closes #622.
    
    Signed-off-by: Marc Parisi <ph...@apache.org>
---
 CMakeLists.txt                |   4 +
 main/CMakeLists.txt           |   3 +-
 main/MiNiFiMain.cpp           |  17 ++
 main/MiNiFiWindowsService.cpp | 372 ++++++++++++++++++++++++++++++++++++++++++
 main/MiNiFiWindowsService.h   |  11 ++
 5 files changed, 406 insertions(+), 1 deletion(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 4965605..f87f074 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -149,6 +149,10 @@ endif()
 
 endif()
 
+if (WIN32)
+	add_definitions(-DSERVICE_NAME="MiNiFi")
+endif()
+
 #set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall")
 
 if (NOT PORTABLE)
diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt
index 062449b..ecd5f6b 100644
--- a/main/CMakeLists.txt
+++ b/main/CMakeLists.txt
@@ -55,7 +55,8 @@ endif()
 
 endif()
 
-add_executable(minifiexe MiNiFiMain.cpp)
+add_executable(minifiexe MiNiFiMain.cpp MiNiFiWindowsService.cpp)
+
 if (NOT USE_SHARED_LIBS)
 	if (LIBC_STATIC)
 		set_target_properties(minifiexe PROPERTIES LINK_SEARCH_START_STATIC 1)
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index 6c39b5e..5028eb2 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -27,6 +27,7 @@
 #include <WinSock2.h>
 #include <WS2tcpip.h>
 #include <Windows.h>
+#include "MiNiFiWindowsService.h"
 #pragma comment(lib, "Ws2_32.lib")
 #pragma comment(lib, "legacy_stdio_definitions.lib")
 #ifdef ENABLE_JNI
@@ -53,6 +54,7 @@
 #include "core/RepositoryFactory.h"
 #include "FlowController.h"
 #include "Main.h"
+
  // Variables that allow us to avoid a timed wait.
 sem_t *running;
 //! Flow Controller
@@ -79,7 +81,12 @@ BOOL WINAPI consoleSignalHandler(DWORD signal) {
 
 	return TRUE;
 }
+
+void SignalExitProcess() {
+  sem_post(running);
+}
 #endif
+
 void sigHandler(int signal) {
 	if (signal == SIGINT || signal == SIGTERM) {
 		// avoid stopping the controller here.
@@ -88,8 +95,18 @@ void sigHandler(int signal) {
 }
 
 int main(int argc, char **argv) {
+#ifdef WIN32
+  CheckRunAsService();
+#endif
+
 	std::shared_ptr<logging::Logger> logger = logging::LoggerConfiguration::getConfiguration().getLogger("main");
 
+#ifdef WIN32
+  if (!CreateServiceTerminationThread(logger)) {
+    return -1;
+  }
+#endif
+
 	uint16_t stop_wait_time = STOP_WAIT_TIME_MS;
 
 	// initialize static functions that were defined apriori
diff --git a/main/MiNiFiWindowsService.cpp b/main/MiNiFiWindowsService.cpp
new file mode 100644
index 0000000..be0e8db
--- /dev/null
+++ b/main/MiNiFiWindowsService.cpp
@@ -0,0 +1,372 @@
+#ifdef WIN32 
+
+#include "MiNiFiWindowsService.h"
+
+#include <Windows.h>
+#include <Strsafe.h>
+#include <tuple>
+#include <tlhelp32.h>
+
+#include "core/FlowConfiguration.h"
+
+//#define DEBUG_SERVICE
+
+#ifdef DEBUG_SERVICE
+  #define LOG_INFO(...)       OutputDebug(__VA_ARGS__)
+  #define LOG_ERROR(...)      OutputDebug(__VA_ARGS__)
+  #define LOG_LASTERROR(str)  OutputDebug(str " lastError %x", GetLastError())
+#else
+  #define LOG_INFO(...)       Log()->log_info(__VA_ARGS__)
+  #define LOG_ERROR(...)      Log()->log_error(__VA_ARGS__)
+  #define LOG_LASTERROR(str)  Log()->log_error(str " lastError %x", GetLastError())
+#endif
+
+#undef DEBUG_SERVICE
+
+// Implemented in MiNiFiMain.cpp
+void SignalExitProcess();
+
+static char* SERVICE_TERMINATION_EVENT_NAME = "Global\\MiNiFiServiceTermination";
+
+static void OutputDebug(const char* format, ...) {
+  va_list args;
+  va_start(args, format);
+
+  char buf[256];
+  sprintf_s(buf, _countof(buf), "%s: %s", SERVICE_NAME, format);
+
+  char out[1024];
+  StringCbVPrintfA(out, sizeof(out), buf, args);
+
+  OutputDebugStringA(out);
+
+  va_end(args);
+};
+
+void CheckRunAsService() {
+  static const int WAIT_TIME_EXE_TERMINATION = 5000;
+  static const int WAIT_TIME_EXE_RESTART = 60000;
+
+  static SERVICE_STATUS s_serviceStatus;
+  static SERVICE_STATUS_HANDLE s_statusHandle;
+  static HANDLE s_hProcess;
+  static HANDLE s_hEvent;
+
+  static auto Log = []() {
+    static std::shared_ptr<logging::Logger> s_logger = logging::LoggerConfiguration::getConfiguration().getLogger("service");
+    return s_logger;
+  };
+
+  SERVICE_TABLE_ENTRY serviceTable[] =
+  {
+    {
+      SERVICE_NAME,
+      [](DWORD argc, LPTSTR *argv)
+      {
+        LOG_INFO("ServiceCtrlDispatcher");
+
+        s_hEvent = CreateEvent(0, TRUE, FALSE, SERVICE_TERMINATION_EVENT_NAME);
+        if (!s_hEvent) {
+          LOG_LASTERROR("!CreateEvent");
+          return;
+        }
+
+        s_statusHandle = RegisterServiceCtrlHandler(
+          SERVICE_NAME,
+          [](DWORD ctrlCode) {
+            LOG_INFO("ServiceCtrlHandler ctrlCode %d", ctrlCode);
+
+            if (SERVICE_CONTROL_STOP == ctrlCode) {
+              LOG_INFO("ServiceCtrlHandler ctrlCode = SERVICE_CONTROL_STOP");
+
+              // Set service status SERVICE_STOP_PENDING.
+              s_serviceStatus.dwControlsAccepted = 0;
+              s_serviceStatus.dwCurrentState = SERVICE_STOP_PENDING;
+              s_serviceStatus.dwWin32ExitCode = 0;
+
+              if (!SetServiceStatus(s_statusHandle, &s_serviceStatus)) {
+                LOG_LASTERROR("!SetServiceStatus SERVICE_STOP_PENDING");
+              }
+
+              bool exeTerminated = false;
+
+              SetEvent(s_hEvent);
+
+              LOG_INFO("Wait for exe termination");
+              switch (auto res = WaitForSingleObject(s_hProcess, WAIT_TIME_EXE_TERMINATION)) {
+                case WAIT_OBJECT_0:
+                  LOG_INFO("Exe terminated");
+                  exeTerminated = true;
+                  break;
+
+                case WAIT_TIMEOUT:
+                  LOG_ERROR("WaitForSingleObject timeout %d", WAIT_TIME_EXE_TERMINATION);
+                  break;
+
+                default:
+                  LOG_ERROR("!WaitForSingleObject return %d", res);
+              }
+
+              if (!exeTerminated) {
+                LOG_INFO("TerminateProcess");
+                if (TerminateProcess(s_hProcess, 0)) {
+                  s_serviceStatus.dwControlsAccepted = 0;
+                  s_serviceStatus.dwCurrentState = SERVICE_STOPPED;
+                  s_serviceStatus.dwWin32ExitCode = 0;
+
+                  if (!SetServiceStatus(s_statusHandle, &s_serviceStatus)) {
+                    LOG_LASTERROR("!SetServiceStatus SERVICE_STOPPED");
+                  }
+                } else {
+                  LOG_LASTERROR("!TerminateProcess");
+                }
+              }
+            }
+          }
+        );
+
+        if (!s_statusHandle) {
+          LOG_LASTERROR("!RegisterServiceCtrlHandler");
+          return;
+        }
+
+        // Set service status SERVICE_START_PENDING.
+        ZeroMemory(&s_serviceStatus, sizeof(s_serviceStatus));
+        s_serviceStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS;
+        s_serviceStatus.dwControlsAccepted = 0;
+        s_serviceStatus.dwCurrentState = SERVICE_START_PENDING;
+        s_serviceStatus.dwWin32ExitCode = 0;
+        s_serviceStatus.dwServiceSpecificExitCode = 0;
+
+        if (!SetServiceStatus(s_statusHandle, &s_serviceStatus)) {
+          LOG_LASTERROR("!SetServiceStatus SERVICE_START_PENDING");
+          return;
+        }
+
+        // Set service status SERVICE_RUNNING.
+        s_serviceStatus.dwControlsAccepted = SERVICE_ACCEPT_STOP;
+        s_serviceStatus.dwCurrentState = SERVICE_RUNNING;
+        s_serviceStatus.dwWin32ExitCode = 0;
+
+        if (!SetServiceStatus(s_statusHandle, &s_serviceStatus)) {
+          LOG_LASTERROR("!SetServiceStatus SERVICE_RUNNING");
+
+          // Set service status SERVICE_START_PENDING.
+          s_serviceStatus.dwControlsAccepted = 0;
+          s_serviceStatus.dwCurrentState = SERVICE_STOPPED;
+          s_serviceStatus.dwWin32ExitCode = GetLastError();
+
+          if (!SetServiceStatus(s_statusHandle, &s_serviceStatus)) {
+            LOG_LASTERROR("!SetServiceStatus SERVICE_STOPPED");
+          }
+
+          return;
+        }
+
+        char filePath[MAX_PATH];
+        if (!GetModuleFileName(0, filePath, _countof(filePath))) {
+          LOG_LASTERROR("!GetModuleFileName");
+          return;
+        }
+
+        do {
+          LOG_INFO("Start exe path %s", filePath);
+
+          STARTUPINFO startupInfo{};
+          startupInfo.cb = sizeof(startupInfo);
+
+          PROCESS_INFORMATION processInformation{};
+
+          if (!CreateProcess(filePath, 0, 0, 0, 0, FALSE, 0, 0, &startupInfo, &processInformation)) {
+            LOG_LASTERROR("!CreateProcess");
+            return;
+          }
+
+          s_hProcess = processInformation.hProcess;
+
+          LOG_INFO("%s started", filePath);
+
+          auto res = WaitForSingleObject(processInformation.hProcess, INFINITE);
+          CloseHandle(processInformation.hProcess);
+          CloseHandle(processInformation.hThread);
+
+          if (WAIT_FAILED == res) {
+            LOG_LASTERROR("!WaitForSingleObject hProcess");
+          } else if (WAIT_OBJECT_0 != res) {
+            LOG_ERROR("!WaitForSingleObject hProcess return %d", res);
+          }
+
+          LOG_INFO("Sleep %d sec before restarting exe", WAIT_TIME_EXE_RESTART/1000);
+          res = WaitForSingleObject(s_hEvent, WAIT_TIME_EXE_RESTART);
+
+          if (WAIT_OBJECT_0 == res) {
+            LOG_INFO("Service was stopped, exe won't be restarted");
+            break;
+          }
+
+          if (WAIT_FAILED == res) {
+            LOG_LASTERROR("!WaitForSingleObject s_hEvent");
+          } if (WAIT_TIMEOUT != res) {
+            LOG_ERROR("!WaitForSingleObject s_hEvent return %d", res);
+          }
+        } while (true);
+
+        s_serviceStatus.dwControlsAccepted = 0;
+        s_serviceStatus.dwCurrentState = SERVICE_STOPPED;
+        s_serviceStatus.dwWin32ExitCode = 0;
+
+        if (!SetServiceStatus(s_statusHandle, &s_serviceStatus)) {
+          LOG_LASTERROR("!SetServiceStatus SERVICE_STOPPED");
+        }
+      } 
+    },
+    {0, 0}
+  };
+
+  if (!StartServiceCtrlDispatcher(serviceTable)) {
+    if (ERROR_FAILED_SERVICE_CONTROLLER_CONNECT == GetLastError()) {
+      // Run this exe as console.
+      return;
+    }
+
+    LOG_LASTERROR("!StartServiceCtrlDispatcher");
+
+    ExitProcess(1);
+  }
+
+  LOG_INFO("Service exit");
+
+  ExitProcess(0);
+}
+
+bool CreateServiceTerminationThread(std::shared_ptr<logging::Logger> logger) {
+  HANDLE hEvent = CreateEvent(0, TRUE, FALSE, SERVICE_TERMINATION_EVENT_NAME);
+  if (!hEvent) {
+    logger->log_error("!CreateEvent lastError %x", GetLastError());
+    return false;
+  }
+
+  if (GetLastError() != ERROR_ALREADY_EXISTS) {
+    CloseHandle(hEvent);
+    return true;
+  }
+
+  // Get hService and monitor it - if service is terminated, then terminate current exe, otherwise the exe becomes unmanageable when service is restarted.
+  auto hService = [&logger]() -> HANDLE {
+    auto hSnapShot = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);
+    if (INVALID_HANDLE_VALUE == hSnapShot) {
+      logger->log_error("!CreateToolhelp32Snapshot lastError %x", GetLastError());
+      return 0;
+    }
+
+    auto getProcessInfo = [&logger, &hSnapShot](DWORD processId, DWORD& parentProcessId, std::string& parentProcessName) {
+      parentProcessId = 0;
+      parentProcessName.clear();
+
+      PROCESSENTRY32 procentry{};
+      procentry.dwSize = sizeof(procentry);
+
+      if (!Process32First(hSnapShot, &procentry)) {
+        logger->log_error("!Process32First lastError %x", GetLastError());
+        return;
+      }
+
+      do {
+        if (processId == procentry.th32ProcessID) {
+          parentProcessId = procentry.th32ParentProcessID;
+          parentProcessName = procentry.szExeFile;
+          return;
+        }
+      } while (Process32Next(hSnapShot, &procentry));
+    };
+
+    // Find current process info, which contains parentProcessId.
+    DWORD parentProcessId{};
+    std::string parentProcessName;
+    getProcessInfo(GetCurrentProcessId(), parentProcessId, parentProcessName);
+
+    // Find parent process info (the service which started current process), which contains service name.
+    DWORD parentParentProcessId{};
+    getProcessInfo(parentProcessId, parentParentProcessId, parentProcessName);
+
+    CloseHandle(hSnapShot);
+
+    // Just in case check that service name == current process name.
+    char filePath[MAX_PATH];
+    if (!GetModuleFileName(0, filePath, _countof(filePath))) {
+      logger->log_error("!GetModuleFileName lastError %x", GetLastError());
+      return 0;
+    }
+
+    const auto pSlash = strrchr(filePath, '\\');
+    if (!pSlash) {
+      logger->log_error("Invalid filePath %s", filePath);
+      return 0;
+    }
+    const std::string fileName = pSlash + 1;
+
+    if (_stricmp(fileName.c_str(), parentProcessName.c_str())) {
+      logger->log_error("Parent process %s != current process %s", parentProcessName.c_str(), fileName.c_str());
+      return 0;
+    }
+
+    const auto hParentProcess = OpenProcess(SYNCHRONIZE, FALSE, parentProcessId);
+    if (!hParentProcess) {
+      logger->log_error("!OpenProcess lastError %x", GetLastError());
+      return 0;
+    }
+
+    return hParentProcess;
+  }();
+  if (!hService)
+    return false;
+
+  using ThreadInfo = std::tuple<std::shared_ptr<logging::Logger>, HANDLE, HANDLE>;
+  auto pThreadInfo = new ThreadInfo(logger, hEvent, hService);
+
+  HANDLE hThread = (HANDLE)_beginthreadex(
+    0, 0,
+    [](void* pPar) {
+      const auto pThreadInfo = static_cast<ThreadInfo*>(pPar);
+      const auto logger = std::get<0>(*pThreadInfo);
+      const auto hEvent = std::get<1>(*pThreadInfo);
+      const auto hService = std::get<2>(*pThreadInfo);
+      delete pThreadInfo;
+
+      HANDLE arHandle[] = { hEvent, hService };
+      switch (auto res = WaitForMultipleObjects(_countof(arHandle), arHandle, FALSE, INFINITE))
+      {
+        case WAIT_FAILED:
+          logger->log_error("!WaitForSingleObject lastError %x", GetLastError());
+        break;
+
+        case WAIT_OBJECT_0:
+          logger->log_info("Service event received");
+        break;
+
+        case WAIT_OBJECT_0 + 1:
+          logger->log_info("Service is terminated");
+        break;
+
+        default:
+          logger->log_info("WaitForMultipleObjects return %d", res);
+      }
+
+      SignalExitProcess();
+
+      return 0U;
+    },
+    pThreadInfo,
+    0, 0);
+  if (!hThread) {
+    logger->log_error("!_beginthreadex lastError %x", GetLastError());
+
+    delete pThreadInfo;
+
+    return false;
+  }
+
+  return true;
+}
+
+#endif
diff --git a/main/MiNiFiWindowsService.h b/main/MiNiFiWindowsService.h
new file mode 100644
index 0000000..ed7e186
--- /dev/null
+++ b/main/MiNiFiWindowsService.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#ifdef WIN32
+
+#include <memory>
+#include "core/Core.h"
+
+void CheckRunAsService();
+bool CreateServiceTerminationThread(std::shared_ptr<logging::Logger> logger);
+
+#endif