You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2019/03/11 14:48:09 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-757: Implement
controller service integration
This is an automated email from the ASF dual-hosted git repository.
aldrin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 0c77c75 MINIFICPP-757: Implement controller service integration
0c77c75 is described below
commit 0c77c75dfb3d92a0963c713c9bc9895f83ba812e
Author: Marc Parisi <ph...@apache.org>
AuthorDate: Wed Mar 6 22:04:29 2019 -0500
MINIFICPP-757: Implement controller service integration
MINIFICPP-752: update docker image
This closes #502.
Signed-off-by: Aldrin Piri <al...@apache.org>
---
CMakeLists.txt | 2 +
docker/DockerVerify.sh | 2 +
docker/Dockerfile | 14 ++-
docker/test/integration/minifi/__init__.py | 24 ++++-
docker/test/test_hash_content.py | 35 +++++++
extensions/jni/ClassRegistrar.h | 80 ++++++++++++++++
extensions/jni/ExecuteJavaControllerService.cpp | 26 ++++--
extensions/jni/ExecuteJavaControllerService.h | 53 +++++++++--
extensions/jni/ExecuteJavaProcessor.cpp | 22 ++++-
extensions/jni/ExecuteJavaProcessor.h | 73 ++++++++-------
extensions/jni/JNILoader.h | 3 +
extensions/jni/JavaException.h | 4 +
extensions/jni/jvm/JVMLoader.h | 13 ++-
extensions/jni/jvm/JavaControllerService.h | 5 +
extensions/jni/jvm/JavaDefs.h | 2 +
extensions/jni/jvm/JavaServicer.h | 1 +
.../{JavaServicer.h => JniConfigurationContext.h} | 40 +++++---
extensions/jni/jvm/JniControllerServiceLookup.cpp | 78 ++++++++++++++++
...ocessContext.h => JniControllerServiceLookup.h} | 45 ++++-----
extensions/jni/jvm/JniInitializationContext.cpp | 101 +++++++++++++++++++++
...ProcessContext.h => JniInitializationContext.h} | 47 +++++-----
extensions/jni/jvm/JniProcessContext.cpp | 14 ++-
extensions/jni/jvm/JniProcessContext.h | 23 +++--
extensions/jni/jvm/JniReferenceObjects.h | 2 +
extensions/jni/jvm/NarClassLoader.h | 6 +-
.../org/apache/nifi/processor/JniClassLoader.java | 17 ++--
.../nifi/processor/JniConfigurationContext.java | 100 ++++++++++++++++++++
.../nifi/processor/JniControllerServiceLookup.java | 35 +++++++
.../nifi/processor/JniInitializationContext.java | 14 +--
.../apache/nifi/processor/JniProcessContext.java | 44 ++++-----
libminifi/include/FlowController.h | 2 +-
libminifi/include/core/FlowConfiguration.h | 3 +-
.../core/controller/ControllerServiceProvider.h | 3 +-
.../controller/StandardControllerServiceProvider.h | 11 ++-
libminifi/src/FlowController.cpp | 4 +-
libminifi/src/core/FlowConfiguration.cpp | 5 +-
libminifi/src/core/yaml/YamlConfiguration.cpp | 4 +-
37 files changed, 773 insertions(+), 184 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 06de1b2..058b992 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -55,6 +55,7 @@ endif()
if(NOT WIN32)
if (ENABLE_JNI)
+ if (NOT DISABLE_JEMALLOC)
set(BASE_DIR "${CMAKE_CURRENT_BINARY_DIR}/thirdparty/jemalloc")
set(DIR "${BASE_DIR}/extensions/jemalloc-src")
@@ -86,6 +87,7 @@ if(NOT WIN32)
set(JEMALLOC_LIBRARY jemalloc CACHE STRING "" FORCE)
set(JEMALLOC_LIBRARY jemalloc CACHE STRING "" FORCE)
+ endif()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_JNI")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_JNI")
diff --git a/docker/DockerVerify.sh b/docker/DockerVerify.sh
index 92e706c..ae48e33 100755
--- a/docker/DockerVerify.sh
+++ b/docker/DockerVerify.sh
@@ -46,6 +46,8 @@ pip install --upgrade \
PyYAML \
m2crypto \
watchdog
+export JAVA_HOME="/usr/lib/jvm/default-jvm"
+export PATH="$PATH:/usr/lib/jvm/default-jvm/bin"
export MINIFI_VERSION=0.6.0
export PYTHONPATH="${PYTHONPATH}:${docker_dir}/test/integration"
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 2279f5f..cf0a5ab 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -33,6 +33,11 @@ RUN apk --update --no-cache upgrade && apk --update --no-cache add gcc \
bison \
flex \
flex-dev \
+ maven \
+ openjdk8-jre-base \
+ openjdk8 \
+ autoconf \
+ libtool \
wget \
gdb \
musl-dev \
@@ -55,6 +60,8 @@ ENV MINIFI_BASE_DIR /opt/minifi
# Setup minificpp user
RUN addgroup -g $GID $USER && adduser -u $UID -D -G $USER -g "" $USER
RUN mkdir -p $MINIFI_BASE_DIR
+ENV JAVA_HOME /usr/lib/jvm/default-jvm
+ENV PATH $PATH:/usr/lib/jvm/default-jvm/bin
ADD $MINIFI_SOURCE_CODE $MINIFI_BASE_DIR
RUN chown -R $USER:$USER $MINIFI_BASE_DIR
@@ -67,7 +74,7 @@ ENV MINIFI_HOME $MINIFI_BASE_DIR/nifi-minifi-cpp-$MINIFI_VERSION
RUN cd $MINIFI_BASE_DIR \
&& mkdir build \
&& cd build \
- && cmake -DOPENSSL_FORCE_SHARED=true -DSKIP_TESTS=true .. \
+ && cmake -DOPENSSL_FORCE_SHARED=true -DDISABLE_JEMALLOC=ON -DSKIP_TESTS=true -DENABLE_JNI=ON .. \
&& make -j8 package \
&& tar -xzvf $MINIFI_BASE_DIR/build/nifi-minifi-cpp-$MINIFI_VERSION-bin.tar.gz -C $MINIFI_BASE_DIR
@@ -88,6 +95,8 @@ RUN apk --update --no-cache upgrade && apk add --update --no-cache \
curl \
unzip \
gpsd \
+ openjdk8-jre-base \
+ openjdk8 \
libressl \
python \
zlib
@@ -96,7 +105,8 @@ RUN apk --update --no-cache upgrade && apk add --update --no-cache \
ENV USER minificpp
ENV MINIFI_BASE_DIR /opt/minifi
ENV MINIFI_HOME $MINIFI_BASE_DIR/nifi-minifi-cpp-$MINIFI_VERSION
-
+ENV JAVA_HOME /usr/lib/jvm/default-jvm
+ENV PATH $PATH:/usr/lib/jvm/default-jvm/bin
RUN addgroup -g $GID $USER && adduser -u $UID -D -G $USER -g "" $USER
RUN mkdir -p $MINIFI_BASE_DIR
diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py
index 7b6752f..a13ba2f 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -415,13 +415,31 @@ class LogAttribute(Processor):
def __init__(self, ):
super(LogAttribute, self).__init__('LogAttribute',
auto_terminate=['success'])
-
+
+
+class DebugFlow(Processor):
+ def __init__(self, ):
+ super(DebugFlow, self).__init__('DebugFlow')
+
+class HashAttribute(Processor):
+ def __init__(self, attributename):
+ super(HashAttribute, self).__init__('HashAttribute',
+ properties={'Hash Value Attribute Key': attributename},
+ auto_terminate=['failure'])
+
+class AttributesToJSON(Processor):
+ def __init__(self, destination, attributes):
+ super(AttributesToJSON, self).__init__('AttributesToJSON',
+ properties={'Destination': destination, 'Attributes List': attributes},
+ schedule={'scheduling period': '0 sec'},
+ auto_terminate=['failure'])
+
class GetFile(Processor):
def __init__(self, input_dir):
super(GetFile, self).__init__('GetFile',
- properties={'Input Directory': input_dir},
- schedule={'scheduling period': '0 sec'},
+ properties={'Input Directory': input_dir, 'Keep Source File': 'true'},
+ schedule={'scheduling period': '2 sec'},
auto_terminate=['success'])
class GenerateFlowFile(Processor):
diff --git a/docker/test/test_hash_content.py b/docker/test/test_hash_content.py
new file mode 100644
index 0000000..5dab499
--- /dev/null
+++ b/docker/test/test_hash_content.py
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from minifi import *
+from minifi.test import *
+
+
+def test_hash_invoke():
+ """
+ Verify sending using InvokeHTTP to a receiver using ListenHTTP.
+ """
+
+ invoke_flow = (GetFile('/tmp/input') >> HashAttribute('hash')
+ >> InvokeHTTP('http://minifi-listen:8080/contentListener', method='POST'))
+
+ listen_flow = ListenHTTP(8080) >> LogAttribute() >> PutFile('/tmp/output')
+
+ with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+ cluster.put_test_data('test')
+ cluster.deploy_flow(listen_flow, name='minifi-listen')
+ cluster.deploy_flow(invoke_flow, name='minifi-invoke')
+
+ assert cluster.check_output()
diff --git a/extensions/jni/ClassRegistrar.h b/extensions/jni/ClassRegistrar.h
new file mode 100644
index 0000000..6d4b6b4
--- /dev/null
+++ b/extensions/jni/ClassRegistrar.h
@@ -0,0 +1,80 @@
+/**
+ * ExecuteJavaClass class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_JNI_CLASSREGISTRAR_H_
+#define EXTENSIONS_JNI_CLASSREGISTRAR_H_
+
+#include <memory>
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "concurrentqueue.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "jvm/JavaControllerService.h"
+#include "jvm/JniProcessContext.h"
+#include "utils/Id.h"
+#include "jvm/NarClassLoader.h"
+#include "jvm/JniLogger.h"
+#include "jvm/JniReferenceObjects.h"
+#include "jvm/JniControllerServiceLookup.h"
+#include "jvm/JniInitializationContext.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace jni {
+
+class ClassRegistrar {
+ public:
+ static ClassRegistrar &getRegistrar() {
+ static ClassRegistrar registrar;
+ // do nothing.
+ return registrar;
+ }
+
+ bool registerClasses(JNIEnv *env, std::shared_ptr<controllers::JavaControllerService> servicer, const std::string &className, JavaSignatures &signatures) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ // load class before insertion.
+ if (registered_classes_.find(className) == std::end(registered_classes_)) {
+ auto cls = servicer->loadClass(className);
+ cls.registerMethods(env, signatures);
+ registered_classes_.insert(className);
+ return true;
+ }
+ return false;
+ }
+ private:
+ ClassRegistrar() {
+ }
+
+ std::mutex mutex_;
+ std::set<std::string> registered_classes_;
+};
+
+} /* namespace jni */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* EXTENSIONS_JNI_CLASSREGISTRAR_H_ */
diff --git a/extensions/jni/ExecuteJavaControllerService.cpp b/extensions/jni/ExecuteJavaControllerService.cpp
index cafb0e4..50210ea 100644
--- a/extensions/jni/ExecuteJavaControllerService.cpp
+++ b/extensions/jni/ExecuteJavaControllerService.cpp
@@ -49,8 +49,6 @@ namespace minifi {
namespace jni {
namespace controllers {
-core::Property ExecuteJavaControllerService::JVMControllerService(
- core::PropertyBuilder::createProperty("JVM Controller Service")->withDescription("Name of controller service defined within this flow")->isRequired(false)->withDefaultValue<std::string>("")->build());
core::Property ExecuteJavaControllerService::NiFiControllerService(
core::PropertyBuilder::createProperty("NiFi Controller Service")->withDescription("Name of NiFi Controller Service to load and run")->isRequired(true)->withDefaultValue<std::string>("")->build());
@@ -59,12 +57,17 @@ const char *ExecuteJavaControllerService::ProcessorName = "ExecuteJavaController
void ExecuteJavaControllerService::initialize() {
logger_->log_info("Initializing ExecuteJavaControllerService");
// Set the supported properties
+ std::string existingValue;
+ getProperty(NiFiControllerService.getName(), existingValue);
std::set<core::Property> properties;
- properties.insert(JVMControllerService);
properties.insert(NiFiControllerService);
setSupportedProperties(properties);
setAcceptAllProperties();
+ if (!existingValue.empty()) {
+ setProperty(NiFiControllerService, existingValue);
+ }
+
}
ExecuteJavaControllerService::~ExecuteJavaControllerService() {
@@ -73,22 +76,31 @@ ExecuteJavaControllerService::~ExecuteJavaControllerService() {
void ExecuteJavaControllerService::onEnable() {
std::string controller_service_name;
- auto env = java_servicer_->attach();
-
auto serv_cs = JVMLoader::getInstance()->getBaseServicer();
java_servicer_ = std::static_pointer_cast<controllers::JavaControllerService>(serv_cs);
+ if (serv_cs == nullptr)
+ throw std::runtime_error("Could not load controller service");
if (!getProperty(NiFiControllerService.getName(), class_name_)) {
throw std::runtime_error("NiFi Controller Service must be defined");
}
+ auto env = java_servicer_->attach();
+
+ ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniConfigurationContext", getJniConfigurationContext());
+ ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniInitializationContext", getJniInitializationContextSignatures());
+ config_context_.service_reference_ = shared_from_this();
+
+ contextInstance = java_servicer_->newInstance("org.apache.nifi.processor.JniConfigurationContext");
+
+ java_servicer_->setReference<minifi::jni::JniConfigurationContext>(env, contextInstance, &config_context_);
+
clazzInstance = java_servicer_->newInstance(class_name_);
auto onEnabledName = java_servicer_->getAnnotation(class_name_, "OnEnabled");
current_cs_class = java_servicer_->getObjectClass(class_name_, clazzInstance);
- // attempt to schedule here
try {
- current_cs_class.callVoidMethod(env, clazzInstance, onEnabledName.first.c_str(), onEnabledName.second);
+ current_cs_class.callVoidMethod(env, clazzInstance, onEnabledName.first.c_str(), onEnabledName.second, contextInstance);
} catch (std::runtime_error &re) {
// this can be ignored.
}
diff --git a/extensions/jni/ExecuteJavaControllerService.h b/extensions/jni/ExecuteJavaControllerService.h
index 85a7d93..9dd854d 100644
--- a/extensions/jni/ExecuteJavaControllerService.h
+++ b/extensions/jni/ExecuteJavaControllerService.h
@@ -31,10 +31,11 @@
#include "concurrentqueue.h"
#include "core/logging/LoggerConfiguration.h"
#include "jvm/JavaControllerService.h"
-#include "jvm/JniProcessContext.h"
+#include "jvm/JniConfigurationContext.h"
+#include "jvm/JniInitializationContext.h"
#include "utils/Id.h"
#include "jvm/NarClassLoader.h"
-
+#include "ClassRegistrar.h"
namespace org {
namespace apache {
namespace nifi {
@@ -52,7 +53,7 @@ namespace controllers {
* controller service within the execute java process.
*
*/
-class ExecuteJavaControllerService : public core::controller::ControllerService {
+class ExecuteJavaControllerService : public ConfigurationContext, public std::enable_shared_from_this<ConfigurationContext> {
public:
// Constructor
@@ -60,21 +61,22 @@ class ExecuteJavaControllerService : public core::controller::ControllerService
* Create a new processor
*/
explicit ExecuteJavaControllerService(std::string name, utils::Identifier uuid = utils::Identifier())
- : core::controller::ControllerService(name, uuid),
+ : ConfigurationContext(name, uuid),
clazzInstance(nullptr),
+ contextInstance(nullptr),
logger_(logging::LoggerFactory<ExecuteJavaControllerService>::getLogger()) {
}
explicit ExecuteJavaControllerService(const std::string &name, const std::string &id)
- : core::controller::ControllerService(name, id),
+ : ConfigurationContext(name, id),
clazzInstance(nullptr),
+ contextInstance(nullptr),
logger_(logging::LoggerFactory<ExecuteJavaControllerService>::getLogger()) {
}
// Destructor
virtual ~ExecuteJavaControllerService();
// Processor Name
static const char *ProcessorName;
- static core::Property JVMControllerService;
static core::Property NiFiControllerService;
// Supported Relationships
@@ -102,16 +104,53 @@ class ExecuteJavaControllerService : public core::controller::ControllerService
// attempt to schedule here
try {
- current_cs_class.callVoidMethod(env, clazzInstance, onEnabledName.first.c_str(), onEnabledName.second);
+ if (!onEnabledName.first.empty())
+ current_cs_class.callVoidMethod(env, clazzInstance, onEnabledName.first.c_str(), onEnabledName.second);
} catch (std::runtime_error &re) {
// this is avoidable.
}
+
+ if (clazzInstance)
+ env->DeleteGlobalRef(clazzInstance);
+ if (contextInstance)
+ env->DeleteGlobalRef(contextInstance);
+ }
+
+ virtual jobject getClassInstance() override {
+ return clazzInstance;
+ }
+
+ static JavaSignatures &getJniConfigurationContext() {
+ static JavaSignatures methodSignatures;
+ if (methodSignatures.empty()) {
+ methodSignatures.addSignature( { "getName", "()Ljava/lang/String;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniConfigurationContext_getName) });
+ methodSignatures.addSignature( { "getComponent", "()Lorg/apache/nifi/components/AbstractConfigurableComponent;",
+ reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniConfigurationContext_getComponent) });
+ methodSignatures.addSignature( { "getPropertyNames", "()Ljava/util/List;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniConfigurationContext_getPropertyNames) });
+ methodSignatures.addSignature(
+ { "getPropertyValue", "(Ljava/lang/String;)Ljava/lang/String;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniConfigurationContext_getPropertyValue) });
+ }
+ return methodSignatures;
+ }
+
+ static JavaSignatures &getJniInitializationContextSignatures() {
+ static JavaSignatures methodSignatures;
+ if (methodSignatures.empty()) {
+ methodSignatures.addSignature( { "getControllerServiceLookup", "()Lorg/apache/nifi/controller/ControllerServiceLookup;",
+ reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniInitializationContext_getControllerServiceLookup) });
+ methodSignatures.addSignature( { "getIdentifier", "()Ljava/lang/String;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniInitializationContext_getIdentifier) });
+ }
+ return methodSignatures;
}
protected:
private:
+ minifi::jni::JniConfigurationContext config_context_;
+
+ jobject contextInstance;
+
JavaClass current_cs_class;
jobject clazzInstance;
diff --git a/extensions/jni/ExecuteJavaProcessor.cpp b/extensions/jni/ExecuteJavaProcessor.cpp
index eb6e5da..12f7e39 100644
--- a/extensions/jni/ExecuteJavaProcessor.cpp
+++ b/extensions/jni/ExecuteJavaProcessor.cpp
@@ -106,15 +106,29 @@ void ExecuteJavaProcessor::onSchedule(const std::shared_ptr<core::ProcessContext
auto env = java_servicer_->attach();
java_servicer_->putNativeFunctionMapping<minifi::jni::JniProcessContext>(env, spn);
+ ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniInitializationContext", getJniInitializationContextSignatures());
+
init = java_servicer_->loadClass("org/apache/nifi/processor/JniInitializationContext");
+ ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniControllerServiceLookup", getJniControllerServiceLookupSignatures());
+
if (context_instance_ != nullptr) {
java_servicer_->attach()->DeleteGlobalRef(context_instance_);
+ } else {
+ init_context_.identifier_ = getUUIDStr();
+ init_context_.lookup_ = &csl_;
+ csl_.cs_lookup_reference_ = context;
+
+ init_context_.lookup_ref_ = java_servicer_->newInstance("org.apache.nifi.processor.JniControllerServiceLookup");
+
+ java_servicer_->setReference<minifi::jni::JniControllerServiceLookup>(env, init_context_.lookup_ref_, &csl_);
}
context_instance_ = spn.newInstance(env);
auto initializer = init.newInstance(env);
+ java_servicer_->setReference<minifi::jni::JniInitializationContext>(env, initializer, &init_context_);
+
ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniLogger", getLoggerSignatures());
jni_logger_ref_.clazz_ = jni_logger_class_.getReference();
@@ -128,6 +142,7 @@ void ExecuteJavaProcessor::onSchedule(const std::shared_ptr<core::ProcessContext
auto onScheduledName = java_servicer_->getAnnotation(class_name_, "OnScheduled");
current_processor_class = java_servicer_->getObjectClass(class_name_, clazzInstance);
// attempt to schedule here
+
ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniProcessContext", getProcessContextSignatures());
init.callVoidMethod(env, initializer, "setLogger", "(Lorg/apache/nifi/processor/JniLogger;)V", logger_instance_);
@@ -138,6 +153,7 @@ void ExecuteJavaProcessor::onSchedule(const std::shared_ptr<core::ProcessContext
jpc.context_ = context;
jpc.clazz_ = spn.getReference();
jpc.processor_ = shared_from_this();
+ jpc.cslookup_ = init_context_.lookup_ref_;
java_servicer_->setReference<minifi::jni::JniProcessContext>(env, context_instance_, &jpc);
@@ -200,8 +216,12 @@ void ExecuteJavaProcessor::onTrigger(const std::shared_ptr<core::ProcessContext>
current_processor_class.callVoidMethod(env, clazzInstance, "onTrigger", "(Lorg/apache/nifi/processor/ProcessContext;Lorg/apache/nifi/processor/ProcessSessionFactory;)V", context_instance_,
java_process_session_factory);
} catch (const JavaException &je) {
- logger_->log_error(" Java occurred during onTrigger, reason: %s", je.what());
+ // clear the java exception so we don't continually wrap it
+ env->ExceptionClear();
+ logger_->log_error(" Java Exception occurred during onTrigger, reason: %s", je.what());
} catch (const std::exception &e) {
+ // clear the java exception so we don't continually wrap it
+ env->ExceptionClear();
logger_->log_error(" Exception occurred during onTrigger, reason: %s", e.what());
}
}
diff --git a/extensions/jni/ExecuteJavaProcessor.h b/extensions/jni/ExecuteJavaProcessor.h
index 308c48d..941ef69 100644
--- a/extensions/jni/ExecuteJavaProcessor.h
+++ b/extensions/jni/ExecuteJavaProcessor.h
@@ -35,7 +35,9 @@
#include "jvm/NarClassLoader.h"
#include "jvm/JniLogger.h"
#include "jvm/JniReferenceObjects.h"
-
+#include "jvm/JniControllerServiceLookup.h"
+#include "jvm/JniInitializationContext.h"
+#include "ClassRegistrar.h"
namespace org {
namespace apache {
namespace nifi {
@@ -43,33 +45,6 @@ namespace minifi {
namespace jni {
namespace processors {
-class ClassRegistrar {
- public:
- static ClassRegistrar &getRegistrar() {
- static ClassRegistrar registrar;
- // do nothing.
- return registrar;
- }
-
- bool registerClasses(JNIEnv *env, std::shared_ptr<controllers::JavaControllerService> servicer, const std::string &className, JavaSignatures &signatures) {
- std::lock_guard<std::mutex> lock(mutex_);
- // load class before insertion.
- if (registered_classes_.find(className) == std::end(registered_classes_)) {
- auto cls = servicer->loadClass(className);
- cls.registerMethods(env, signatures);
- registered_classes_.insert(className);
- return true;
- }
- return false;
- }
- private:
- ClassRegistrar() {
- }
-
- std::mutex mutex_;
- std::set<std::string> registered_classes_;
-};
-
/**
* Purpose and Justification: Executes a java NiFi Processor
*
@@ -129,9 +104,12 @@ class ExecuteJavaProcessor : public core::Processor {
static JavaSignatures &getProcessContextSignatures() {
static JavaSignatures methodSignatures;
if (methodSignatures.empty()) {
- methodSignatures.addSignature( { "getProcessor", "()Lorg/apache/nifi/processor/Processor;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessContext_getProcessor) });
+ methodSignatures.addSignature( { "getComponent", "()Lorg/apache/nifi/components/AbstractConfigurableComponent;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessContext_getComponent) });
methodSignatures.addSignature( { "getPropertyNames", "()Ljava/util/List;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessContext_getPropertyNames) });
methodSignatures.addSignature( { "getPropertyValue", "(Ljava/lang/String;)Ljava/lang/String;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessContext_getPropertyValue) });
+ methodSignatures.addSignature( { "getName", "()Ljava/lang/String;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessContext_getName) });
+ methodSignatures.addSignature( { "getControllerServiceLookup", "()Lorg/apache/nifi/controller/ControllerServiceLookup;",
+ reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessContext_getControllerServiceLookup) });
}
return methodSignatures;
}
@@ -191,6 +169,31 @@ class ExecuteJavaProcessor : public core::Processor {
return methodSignatures;
}
+ static JavaSignatures &getJniInitializationContextSignatures() {
+ static JavaSignatures methodSignatures;
+ if (methodSignatures.empty()) {
+ methodSignatures.addSignature( { "getControllerServiceLookup", "()Lorg/apache/nifi/controller/ControllerServiceLookup;",
+ reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniInitializationContext_getControllerServiceLookup) });
+ methodSignatures.addSignature( { "getIdentifier", "()Ljava/lang/String;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniInitializationContext_getIdentifier) });
+ }
+ return methodSignatures;
+ }
+
+ static JavaSignatures &getJniControllerServiceLookupSignatures() {
+ static JavaSignatures methodSignatures;
+ if (methodSignatures.empty()) {
+ methodSignatures.addSignature( { "getControllerService", "(Ljava/lang/String;)Lorg/apache/nifi/controller/ControllerService;",
+ reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniControllerServiceLookup_getControllerService) });
+ methodSignatures.addSignature( { "isControllerServiceEnabled", "(Ljava/lang/String;)Z",
+ reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniControllerServiceLookup_isControllerServiceEnabled) });
+ methodSignatures.addSignature( { "isControllerServiceEnabling", "(Ljava/lang/String;)Z",
+ reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniControllerServiceLookup_isControllerServiceEnabling) });
+ methodSignatures.addSignature( { "getControllerServiceName", "(Ljava/lang/String;)Ljava/lang/String;",
+ reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniControllerServiceLookup_getControllerServiceName) });
+ }
+ return methodSignatures;
+ }
+
static JavaSignatures &getProcessSessionFactorySignatures() {
static JavaSignatures methodSignatures;
if (methodSignatures.empty()) {
@@ -223,10 +226,14 @@ class ExecuteJavaProcessor : public core::Processor {
// delete the reference to the jni process session
if (logger_instance_) {
- localEnv->DeleteLocalRef(logger_instance_);
+ localEnv->DeleteGlobalRef(logger_instance_);
logger_instance_ = nullptr;
}
+ if (init_context_.lookup_ref_) {
+ localEnv->DeleteGlobalRef(init_context_.lookup_ref_);
+ }
+
}
private:
@@ -281,7 +288,11 @@ class ExecuteJavaProcessor : public core::Processor {
std::shared_ptr<logging::Logger> logger_;
std::shared_ptr<logging::Logger> nifi_logger_;
- ;
+
+ JniControllerServiceLookup csl_;
+
+ JniInitializationContext init_context_;
+
};
REGISTER_RESOURCE(ExecuteJavaProcessor, "ExecuteJavaClass runs NiFi processors given a provided system path ")
diff --git a/extensions/jni/JNILoader.h b/extensions/jni/JNILoader.h
index 69d6ef9..a8bc1ef 100644
--- a/extensions/jni/JNILoader.h
+++ b/extensions/jni/JNILoader.h
@@ -20,6 +20,7 @@
#include "core/ClassLoader.h"
#include "ExecuteJavaProcessor.h"
+#include "ExecuteJavaControllerService.h"
#include "JVMCreator.h"
#include "jvm/JavaControllerService.h"
#include "utils/StringUtils.h"
@@ -60,6 +61,8 @@ class JNIFactory : public core::ObjectFactory {
return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::jni::processors::ExecuteJavaProcessor>());
} else if (utils::StringUtils::equalsIgnoreCase(class_name, "JavaControllerService")) {
return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::jni::controllers::JavaControllerService>());
+ } else if (utils::StringUtils::equalsIgnoreCase(class_name, "ExecuteJavaControllerService")) {
+ return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::jni::controllers::ExecuteJavaControllerService>());
} else if (utils::StringUtils::equalsIgnoreCase(class_name, "JVMCreator")) {
return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::jni::JVMCreator>());
}
diff --git a/extensions/jni/JavaException.h b/extensions/jni/JavaException.h
index ec701b0..11d42ca 100644
--- a/extensions/jni/JavaException.h
+++ b/extensions/jni/JavaException.h
@@ -70,6 +70,10 @@ static std::string getMessage(JNIEnv *env, jthrowable throwable) {
return "";
}
jstring message = (jstring) env->CallObjectMethod(throwable, getMessage);
+ if (env->ExceptionOccurred()) {
+ env->ExceptionClear();
+ return JVM_ERROR_MSG;
+ }
if (message) {
// do whatever with mstr
std::string excp = JniStringToUTF(env, message);
diff --git a/extensions/jni/jvm/JVMLoader.h b/extensions/jni/jvm/JVMLoader.h
index 326d313..53c4b79 100644
--- a/extensions/jni/jvm/JVMLoader.h
+++ b/extensions/jni/jvm/JVMLoader.h
@@ -100,6 +100,7 @@ class JVMLoader {
JNIEnv *attach(const std::string &name = "") {
JNIEnv* jenv;
jint ret = jvm_->GetEnv((void**) &jenv, JNI_VERSION_1_8);
+
if (ret == JNI_EDETACHED) {
ret = jvm_->AttachCurrentThread((void**) &jenv, NULL);
if (ret != JNI_OK || jenv == NULL) {
@@ -110,6 +111,10 @@ class JVMLoader {
return jenv;
}
+ void detach(){
+ jvm_->DetachCurrentThread();
+ }
+
/**
* Returns a reference to an instantiated class loader
* @return class loader.
@@ -439,7 +444,9 @@ class JVMLoader {
jclass c = env->FindClass(name);
jclass c_global = (jclass) env->NewGlobalRef(c);
if (!c) {
- throw std::runtime_error("Could not find ");
+ std::stringstream ss;
+ ss << "Could not find " << name;
+ throw std::runtime_error(ss.str());
}
return c_global;
}
@@ -468,7 +475,9 @@ class JVMLoader {
jclass classClass = env_->GetObjectClass(randomClass);
auto classLoaderClass = find_class_global(env_, "java/lang/ClassLoader");
auto getClassLoaderMethod = env_->GetMethodID(classClass, "getClassLoader", "()Ljava/lang/ClassLoader;");
- gClassLoader = env_->NewGlobalRef(env_->CallObjectMethod(randomClass, getClassLoaderMethod));
+ auto refclazz = env_->CallObjectMethod(randomClass, getClassLoaderMethod);
+ minifi::jni::ThrowIf(env_);
+ gClassLoader = env_->NewGlobalRef(refclazz);
gFindClassMethod = env_->GetMethodID(classLoaderClass, "findClass", "(Ljava/lang/String;)Ljava/lang/Class;");
minifi::jni::ThrowIf(env_);
initialized_ = true;
diff --git a/extensions/jni/jvm/JavaControllerService.h b/extensions/jni/jvm/JavaControllerService.h
index c497bc0..9721440 100644
--- a/extensions/jni/jvm/JavaControllerService.h
+++ b/extensions/jni/jvm/JavaControllerService.h
@@ -105,6 +105,11 @@ class JavaControllerService : public core::controller::ControllerService, public
virtual JNIEnv *attach() override {
return loader->attach();
}
+ virtual void detach() override {
+ loader->detach();
+ }
+
+
virtual jobject getClassLoader() override {
return loader->getClassLoader();
diff --git a/extensions/jni/jvm/JavaDefs.h b/extensions/jni/jvm/JavaDefs.h
index d9dadde..28f9fad 100644
--- a/extensions/jni/jvm/JavaDefs.h
+++ b/extensions/jni/jvm/JavaDefs.h
@@ -23,5 +23,7 @@
#define NO_FF_OBJECT "Calling function on null flow file"
+#define JVM_ERROR_MSG "Error calling toString on callable. JVM error likely"
+
#endif /* EXTENSIONS_JNI_JVM_JAVADEFS_H_ */
diff --git a/extensions/jni/jvm/JavaServicer.h b/extensions/jni/jvm/JavaServicer.h
index 1e80973..4c8521a 100644
--- a/extensions/jni/jvm/JavaServicer.h
+++ b/extensions/jni/jvm/JavaServicer.h
@@ -39,6 +39,7 @@ class JavaServicer {
}
virtual JNIEnv *attach() = 0;
+ virtual void detach() = 0;
virtual jobject getClassLoader() = 0;
virtual JavaClass loadClass(const std::string &class_name_) = 0;
};
diff --git a/extensions/jni/jvm/JavaServicer.h b/extensions/jni/jvm/JniConfigurationContext.h
similarity index 55%
copy from extensions/jni/jvm/JavaServicer.h
copy to extensions/jni/jvm/JniConfigurationContext.h
index 1e80973..66f0c47 100644
--- a/extensions/jni/jvm/JavaServicer.h
+++ b/extensions/jni/jvm/JniConfigurationContext.h
@@ -16,11 +16,10 @@
* limitations under the License.
*/
-#ifndef EXTENSIONS_JNI_JVM_JAVASERVICER_H_
-#define EXTENSIONS_JNI_JVM_JAVASERVICER_H_
+#ifndef EXTENSIONS_JNI_JVM_JNICONFIGURATIONCONTEXT_H_
+#define EXTENSIONS_JNI_JVM_JNICONFIGURATIONCONTEXT_H_
-#include <jni.h>
-#include "JavaClass.h"
+#include "core/controller/ControllerService.h"
namespace org {
namespace apache {
@@ -28,19 +27,30 @@ namespace nifi {
namespace minifi {
namespace jni {
-/**
- * Purpose: Java servicer provides a shared construct to be used by classes
- * within the JNI extension to attach the JNI environment, get the class loader
- * and load a class.
- */
-class JavaServicer {
+class ConfigurationContext : public core::controller::ControllerService {
public:
- virtual ~JavaServicer() {
+
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ explicit ConfigurationContext(std::string name, utils::Identifier uuid = utils::Identifier())
+ : core::controller::ControllerService(name, uuid) {
+ }
+
+ explicit ConfigurationContext(const std::string &name, const std::string &id)
+ : core::controller::ControllerService(name, id) {
+ }
+
+ virtual ~ConfigurationContext() {
}
- virtual JNIEnv *attach() = 0;
- virtual jobject getClassLoader() = 0;
- virtual JavaClass loadClass(const std::string &class_name_) = 0;
+ virtual jobject getClassInstance() = 0;
+};
+
+struct JniConfigurationContext {
+
+ std::shared_ptr<ConfigurationContext> service_reference_;
};
} /* namespace jni */
@@ -49,4 +59,4 @@ class JavaServicer {
} /* namespace apache */
} /* namespace org */
-#endif /* EXTENSIONS_JNI_JVM_JAVASERVICER_H_ */
+#endif /* EXTENSIONS_JNI_JVM_JNICONFIGURATIONCONTEXT_H_ */
diff --git a/extensions/jni/jvm/JniControllerServiceLookup.cpp b/extensions/jni/jvm/JniControllerServiceLookup.cpp
new file mode 100644
index 0000000..e7658e0
--- /dev/null
+++ b/extensions/jni/jvm/JniControllerServiceLookup.cpp
@@ -0,0 +1,78 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "JniControllerServiceLookup.h"
+
+#include <string>
+#include <memory>
+#include <algorithm>
+#include <iterator>
+#include <set>
+#include "core/Property.h"
+#include "io/validation.h"
+#include "core/FlowFile.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+#include "properties/Configure.h"
+#include "JVMLoader.h"
+#include "../JavaException.h"
+#include "../JNIUtil.h"
+#include "JniReferenceObjects.h"
+#include "JavaDefs.h"
+#include "core/controller/ControllerService.h"
+#include "core/controller/ControllerServiceLookup.h"
+#include "../ExecuteJavaControllerService.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+jobject Java_org_apache_nifi_processor_JniControllerServiceLookup_getControllerService(JNIEnv *env, jobject obj, jstring cs) {
+ minifi::jni::JniControllerServiceLookup *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniControllerServiceLookup>(env, obj);
+ auto str = JniStringToUTF(env, cs);
+ auto controller_service = ptr->cs_lookup_reference_->getControllerService(str);
+ if (nullptr != controller_service) {
+ auto ecs = std::dynamic_pointer_cast<minifi::jni::controllers::ExecuteJavaControllerService>(controller_service);
+ if (nullptr != ecs) {
+ return ecs->getClassInstance();
+ }
+ }
+ return nullptr;
+}
+
+jboolean Java_org_apache_nifi_processor_JniControllerServiceLookup_isControllerServiceEnabled(JNIEnv *env, jobject obj, jstring cs) {
+ minifi::jni::JniControllerServiceLookup *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniControllerServiceLookup>(env, obj);
+ auto str = JniStringToUTF(env, cs);
+ return ptr->cs_lookup_reference_->isControllerServiceEnabled(str);
+}
+
+jboolean Java_org_apache_nifi_processor_JniControllerServiceLookup_isControllerServiceEnabling(JNIEnv *env, jobject obj, jstring cs) {
+ minifi::jni::JniControllerServiceLookup *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniControllerServiceLookup>(env, obj);
+ auto str = JniStringToUTF(env, cs);
+ return ptr->cs_lookup_reference_->isControllerServiceEnabling(str);
+}
+
+jstring Java_org_apache_nifi_processor_JniControllerServiceLookup_getControllerServiceName(JNIEnv *env, jobject obj, jstring cs) {
+ minifi::jni::JniControllerServiceLookup *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniControllerServiceLookup>(env, obj);
+ auto str = JniStringToUTF(env, cs);
+ return env->NewStringUTF(ptr->cs_lookup_reference_->getControllerServiceName(str).c_str());
+}
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/extensions/jni/jvm/JniProcessContext.h b/extensions/jni/jvm/JniControllerServiceLookup.h
similarity index 56%
copy from extensions/jni/jvm/JniProcessContext.h
copy to extensions/jni/jvm/JniControllerServiceLookup.h
index 2086c18..5dd1294 100644
--- a/extensions/jni/jvm/JniProcessContext.h
+++ b/extensions/jni/jvm/JniControllerServiceLookup.h
@@ -15,8 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef EXTENSIONS_JNIPROCESSCONTEXT_H
-#define EXTENSIONS_JNIPROCESSCONTEXT_H
+
+#ifndef EXTENSIONS_JNI_JVM_JNICONTROLLERSERVICELOOKUP_H_
+#define EXTENSIONS_JNI_JVM_JNICONTROLLERSERVICELOOKUP_H_
#include <string>
#include <vector>
@@ -33,17 +34,15 @@ namespace nifi {
namespace minifi {
namespace jni {
-struct JniProcessContext {
- JniProcessContext()
- : nifi_processor_(nullptr),
- processor_(nullptr),
- context_(nullptr) {
- }
+class JniControllerService {
+ public:
- jclass clazz_;
- jobject nifi_processor_;
- std::shared_ptr<core::Processor> processor_;
- std::shared_ptr<core::ProcessContext> context_;
+ std::shared_ptr<core::controller::ControllerService> cs_reference_;
+};
+
+class JniControllerServiceLookup {
+ public:
+ std::shared_ptr<core::controller::ControllerServiceLookup> cs_lookup_reference_;
};
} /* namespace jni */
@@ -52,29 +51,21 @@ struct JniProcessContext {
} /* namespace apache */
} /* namespace org */
+
#ifdef __cplusplus
extern "C" {
#endif
-JNIEXPORT jstring JNICALL Java_org_apache_nifi_processor_JniProcessContext_getPropertyValue(JNIEnv *env, jobject obj, jstring propertyName);
+JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniControllerServiceLookup_getControllerService(JNIEnv *env, jobject obj, jstring cs);
+
+JNIEXPORT jboolean JNICALL Java_org_apache_nifi_processor_JniControllerServiceLookup_isControllerServiceEnabled(JNIEnv *env, jobject obj, jstring cs);
-JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessContext_getPropertyNames(JNIEnv *env, jobject obj);
+JNIEXPORT jboolean JNICALL Java_org_apache_nifi_processor_JniControllerServiceLookup_isControllerServiceEnabling(JNIEnv *env, jobject obj, jstring cs);
-JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessContext_getProcessor(JNIEnv *env, jobject obj);
+JNIEXPORT jstring JNICALL Java_org_apache_nifi_processor_JniControllerServiceLookup_getControllerServiceName(JNIEnv *env, jobject obj, jstring cs);
#ifdef __cplusplus
}
#endif
-/*
- class JniProcessContext {
- public:
- JniProcessContext(const std::shared_ptr<core::ProcessContext> &ctx)
- : ctx_(ctx) {
-
- }
- private:
- std::shared_ptr<core::ProcessContext> ctx_;
- };*/
-
-#endif /* EXTENSIONS_JNIPROCESSCONTEXT_H */
+#endif /* EXTENSIONS_JNI_JVM_JNICONTROLLERSERVICELOOKUP_H_ */
diff --git a/extensions/jni/jvm/JniInitializationContext.cpp b/extensions/jni/jvm/JniInitializationContext.cpp
new file mode 100644
index 0000000..e3cffa7
--- /dev/null
+++ b/extensions/jni/jvm/JniInitializationContext.cpp
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "JniInitializationContext.h"
+
+#include "JniConfigurationContext.h"
+#include <string>
+#include <memory>
+#include <algorithm>
+#include <iterator>
+#include <set>
+#include "core/Property.h"
+#include "io/validation.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+#include "properties/Configure.h"
+#include "JVMLoader.h"
+
+jstring Java_org_apache_nifi_processor_JniInitializationContext_getIdentifier(JNIEnv *env, jobject obj) {
+ minifi::jni::JniInitializationContext *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniInitializationContext>(env, obj);
+ return env->NewStringUTF(ptr->identifier_.c_str());
+}
+
+jobject Java_org_apache_nifi_processor_JniInitializationContext_getControllerServiceLookup(JNIEnv *env, jobject obj) {
+ minifi::jni::JniInitializationContext *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniInitializationContext>(env, obj);
+ if (ptr->lookup_ref_ == nullptr) {
+ auto csl = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniControllerServiceLookup", env);
+ ptr->lookup_ref_ = csl.newInstance(env);
+ minifi::jni::ThrowIf(env);
+ minifi::jni::JVMLoader::getInstance()->setReference(ptr->lookup_ref_, env, ptr->lookup_);
+ }
+ return ptr->lookup_ref_;
+}
+
+
+
+jstring Java_org_apache_nifi_processor_JniConfigurationContext_getPropertyValue(JNIEnv *env, jobject obj, jstring propertyName) {
+ if (obj == nullptr || propertyName == nullptr) {
+ return nullptr;
+ }
+ std::string value;
+ minifi::jni::JniConfigurationContext *context = minifi::jni::JVMLoader::getPtr<minifi::jni::JniConfigurationContext>(env, obj);
+
+ if (context == nullptr || context->service_reference_ == nullptr) {
+ return nullptr;
+ }
+ std::string keystr = JniStringToUTF(env, propertyName);
+ if (!context->service_reference_->getProperty(keystr, value)) {
+ if (!context->service_reference_->getDynamicProperty(keystr, value)) {
+ return nullptr;
+ }
+ }
+
+ return env->NewStringUTF(value.c_str());
+}
+
+jobject Java_org_apache_nifi_processor_JniConfigurationContext_getPropertyNames(JNIEnv *env, jobject obj) {
+ minifi::jni::JniConfigurationContext *context = minifi::jni::JVMLoader::getPtr<minifi::jni::JniConfigurationContext>(env, obj);
+ auto cppProcessor = context->service_reference_;
+ auto keys = cppProcessor->getProperties();
+ jclass arraylist = env->FindClass("java/util/ArrayList");
+ jmethodID init_method = env->GetMethodID(arraylist, "<init>", "(I)V");
+ jmethodID add_method = env->GetMethodID(arraylist, "add", "(Ljava/lang/Object;)Z");
+ jobject result = env->NewObject(arraylist, init_method, keys.size());
+ for (const auto &s : keys) {
+ if (s.second.isTransient()) {
+ jstring element = env->NewStringUTF(s.first.c_str());
+ env->CallBooleanMethod(result, add_method, element);
+ minifi::jni::ThrowIf(env);
+ env->DeleteLocalRef(element);
+ }
+ }
+ return result;
+}
+
+jobject Java_org_apache_nifi_processor_JniConfigurationContext_getComponent(JNIEnv *env, jobject obj) {
+ minifi::jni::JniConfigurationContext *context = minifi::jni::JVMLoader::getPtr<minifi::jni::JniConfigurationContext>(env, obj);
+ minifi::jni::ThrowIf(env);
+ return context->service_reference_->getClassInstance();
+}
+
+jstring Java_org_apache_nifi_processor_JniConfigurationContext_getName(JNIEnv *env, jobject obj) {
+ minifi::jni::JniConfigurationContext *context = minifi::jni::JVMLoader::getPtr<minifi::jni::JniConfigurationContext>(env, obj);
+ minifi::jni::ThrowIf(env);
+ return env->NewStringUTF(context->service_reference_->getName().c_str());
+}
diff --git a/extensions/jni/jvm/JniProcessContext.h b/extensions/jni/jvm/JniInitializationContext.h
similarity index 51%
copy from extensions/jni/jvm/JniProcessContext.h
copy to extensions/jni/jvm/JniInitializationContext.h
index 2086c18..aae970d 100644
--- a/extensions/jni/jvm/JniProcessContext.h
+++ b/extensions/jni/jvm/JniInitializationContext.h
@@ -15,8 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef EXTENSIONS_JNIPROCESSCONTEXT_H
-#define EXTENSIONS_JNIPROCESSCONTEXT_H
+#ifndef EXTENSIONS_JNIINITIALIZATIONCONTEXT_H
+#define EXTENSIONS_JNIINITIALIZATIONCONTEXT_H
#include <string>
#include <vector>
@@ -26,6 +26,7 @@
#include <jni.h>
#include "core/Processor.h"
#include "core/ProcessSession.h"
+#include "JniControllerServiceLookup.h"
namespace org {
namespace apache {
@@ -33,17 +34,15 @@ namespace nifi {
namespace minifi {
namespace jni {
-struct JniProcessContext {
- JniProcessContext()
- : nifi_processor_(nullptr),
- processor_(nullptr),
- context_(nullptr) {
+struct JniInitializationContext {
+ JniInitializationContext()
+ : lookup_(nullptr),
+ lookup_ref_(nullptr) {
}
- jclass clazz_;
- jobject nifi_processor_;
- std::shared_ptr<core::Processor> processor_;
- std::shared_ptr<core::ProcessContext> context_;
+ std::string identifier_;
+ JniControllerServiceLookup *lookup_;
+ jobject lookup_ref_;
};
} /* namespace jni */
@@ -56,25 +55,23 @@ struct JniProcessContext {
extern "C" {
#endif
-JNIEXPORT jstring JNICALL Java_org_apache_nifi_processor_JniProcessContext_getPropertyValue(JNIEnv *env, jobject obj, jstring propertyName);
+JNIEXPORT jstring JNICALL Java_org_apache_nifi_processor_JniInitializationContext_getIdentifier(JNIEnv *env, jobject obj);
-JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessContext_getPropertyNames(JNIEnv *env, jobject obj);
+JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniInitializationContext_getControllerServiceLookup(JNIEnv *env, jobject obj);
+
+// configuration context
+JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniConfigurationContext_getPropertyNames(JNIEnv *env, jobject obj);
+
+JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniConfigurationContext_getComponent(JNIEnv *env, jobject obj);
+
+JNIEXPORT jstring JNICALL Java_org_apache_nifi_processor_JniConfigurationContext_getName(JNIEnv *env, jobject obj);
+
+JNIEXPORT jstring JNICALL Java_org_apache_nifi_processor_JniConfigurationContext_getPropertyValue(JNIEnv *env, jobject obj, jstring property);
-JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessContext_getProcessor(JNIEnv *env, jobject obj);
#ifdef __cplusplus
}
#endif
-/*
- class JniProcessContext {
- public:
- JniProcessContext(const std::shared_ptr<core::ProcessContext> &ctx)
- : ctx_(ctx) {
-
- }
- private:
- std::shared_ptr<core::ProcessContext> ctx_;
- };*/
-#endif /* EXTENSIONS_JNIPROCESSCONTEXT_H */
+#endif /* EXTENSIONS_JNIINITIALIZATIONCONTEXT_H */
diff --git a/extensions/jni/jvm/JniProcessContext.cpp b/extensions/jni/jvm/JniProcessContext.cpp
index 288e1b5..15c2d34 100644
--- a/extensions/jni/jvm/JniProcessContext.cpp
+++ b/extensions/jni/jvm/JniProcessContext.cpp
@@ -69,8 +69,20 @@ jobject Java_org_apache_nifi_processor_JniProcessContext_getPropertyNames(JNIEnv
return result;
}
-jobject Java_org_apache_nifi_processor_JniProcessContext_getProcessor(JNIEnv *env, jobject obj) {
+jobject Java_org_apache_nifi_processor_JniProcessContext_getComponent(JNIEnv *env, jobject obj) {
minifi::jni::JniProcessContext *context = minifi::jni::JVMLoader::getPtr<minifi::jni::JniProcessContext>(env, obj);
minifi::jni::ThrowIf(env);
return context->nifi_processor_;
}
+
+jstring Java_org_apache_nifi_processor_JniProcessContext_getName(JNIEnv *env, jobject obj) {
+ minifi::jni::JniProcessContext *context = minifi::jni::JVMLoader::getPtr<minifi::jni::JniProcessContext>(env, obj);
+ minifi::jni::ThrowIf(env);
+ return env->NewStringUTF(context->context_->getProcessorNode()->getName().c_str());
+}
+
+jobject Java_org_apache_nifi_processor_JniProcessContext_getControllerServiceLookup(JNIEnv *env, jobject obj) {
+ minifi::jni::JniProcessContext *context = minifi::jni::JVMLoader::getPtr<minifi::jni::JniProcessContext>(env, obj);
+ minifi::jni::ThrowIf(env);
+ return context->cslookup_;
+}
diff --git a/extensions/jni/jvm/JniProcessContext.h b/extensions/jni/jvm/JniProcessContext.h
index 2086c18..60f0a2b 100644
--- a/extensions/jni/jvm/JniProcessContext.h
+++ b/extensions/jni/jvm/JniProcessContext.h
@@ -26,6 +26,7 @@
#include <jni.h>
#include "core/Processor.h"
#include "core/ProcessSession.h"
+#include "JniControllerServiceLookup.h"
namespace org {
namespace apache {
@@ -36,12 +37,14 @@ namespace jni {
struct JniProcessContext {
JniProcessContext()
: nifi_processor_(nullptr),
+ cslookup_(nullptr),
processor_(nullptr),
context_(nullptr) {
}
jclass clazz_;
jobject nifi_processor_;
+ jobject cslookup_;
std::shared_ptr<core::Processor> processor_;
std::shared_ptr<core::ProcessContext> context_;
};
@@ -60,21 +63,17 @@ JNIEXPORT jstring JNICALL Java_org_apache_nifi_processor_JniProcessContext_getPr
JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessContext_getPropertyNames(JNIEnv *env, jobject obj);
-JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessContext_getProcessor(JNIEnv *env, jobject obj);
+JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessContext_getComponent(JNIEnv *env, jobject obj);
+
+JNIEXPORT jstring JNICALL Java_org_apache_nifi_processor_JniProcessContext_getName(JNIEnv *env, jobject obj);
+
+JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessContext_getControllerServiceLookup(JNIEnv *env, jobject obj);
+
+//getname
+//getControllerservicelookup
#ifdef __cplusplus
}
#endif
-/*
- class JniProcessContext {
- public:
- JniProcessContext(const std::shared_ptr<core::ProcessContext> &ctx)
- : ctx_(ctx) {
-
- }
- private:
- std::shared_ptr<core::ProcessContext> ctx_;
- };*/
-
#endif /* EXTENSIONS_JNIPROCESSCONTEXT_H */
diff --git a/extensions/jni/jvm/JniReferenceObjects.h b/extensions/jni/jvm/JniReferenceObjects.h
index c5355fb..0dfcd93 100644
--- a/extensions/jni/jvm/JniReferenceObjects.h
+++ b/extensions/jni/jvm/JniReferenceObjects.h
@@ -365,6 +365,8 @@ class JniSessionFactory : public core::WeakReference {
};
+
+
} /* namespace jni */
} /* namespace minifi */
} /* namespace nifi */
diff --git a/extensions/jni/jvm/NarClassLoader.h b/extensions/jni/jvm/NarClassLoader.h
index 223caed..a647059 100644
--- a/extensions/jni/jvm/NarClassLoader.h
+++ b/extensions/jni/jvm/NarClassLoader.h
@@ -134,7 +134,11 @@ class NarClassLoader {
auto clazz_name = env->NewStringUTF(requested_name.c_str());
- jobject obj = env->NewGlobalRef(env->CallObjectMethod(class_loader_, mthd, clazz_name));
+ auto newref = env->CallObjectMethod(class_loader_, mthd, clazz_name);
+
+ ThrowIf(env);
+
+ jobject obj = env->NewGlobalRef(newref);
ThrowIf(env);
diff --git a/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniClassLoader.java b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniClassLoader.java
index 4b61a26..cc6416a 100644
--- a/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniClassLoader.java
+++ b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniClassLoader.java
@@ -3,6 +3,8 @@ package org.apache.nifi.processor;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleDetails;
@@ -343,16 +345,9 @@ public class JniClassLoader {
if (cs != null) {
List<PropertyDescriptor> descriptors = cs.getPropertyDescriptors();
final String description = getDescription(cs.getClass());
- final DynamicProperty dynProperty = getDynamicPropertyAnnotation(cs.getClass());
- final DynamicRelationship dynRelationShip = getDynamicRelationshipAnnotation(cs.getClass());
classes.put(cs.getClass().getCanonicalName(),cs.getClass());
JniComponent.JniComponentBuilder builder = JniComponent.JniComponentBuilder.create(cs.getClass().getCanonicalName()).addProperties(descriptors).addDescription(description).setIsControllerService();
- if (dynProperty != null) {
- builder.setDynamicProperties();
- }
- if (dynRelationShip != null) {
- builder.setDynamicRelationships();
- }
+ builder.setDynamicProperties();
components.add(builder.build());
}
@@ -489,6 +484,12 @@ public class JniClassLoader {
} else {
List<Method> methods = getAnnotatedMethods(clazz, OnScheduled.class);
methods.stream().forEach(mthd -> onScheduledMethod.put(new AbstractMap.SimpleImmutableEntry<>(className, "OnScheduled"), mthd));
+
+ methods = getAnnotatedMethods(clazz, OnEnabled.class);
+ methods.stream().forEach(mthd -> onScheduledMethod.put(new AbstractMap.SimpleImmutableEntry<>(className, "OnEnabled"), mthd));
+
+ methods = getAnnotatedMethods(clazz, OnDisabled.class);
+ methods.stream().forEach(mthd -> onScheduledMethod.put(new AbstractMap.SimpleImmutableEntry<>(className, "OnDisabled"), mthd));
}
return clazz.newInstance();
diff --git a/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniConfigurationContext.java b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniConfigurationContext.java
new file mode 100644
index 0000000..1736ec7
--- /dev/null
+++ b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniConfigurationContext.java
@@ -0,0 +1,100 @@
+package org.apache.nifi.processor;
+
+import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
+import org.apache.nifi.components.AbstractConfigurableComponent;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceLookup;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class JniConfigurationContext implements ConfigurationContext, ControllerServiceLookup{
+
+ long nativePtr;
+
+
+ private native List<String> getPropertyNames();
+
+ private native AbstractConfigurableComponent getComponent();
+ @Override
+ public Map<PropertyDescriptor, String> getProperties() {
+ List<String> propertyNames = getPropertyNames();
+ AbstractConfigurableComponent component = getComponent();
+
+ return propertyNames.stream().collect(Collectors.toMap(component::getPropertyDescriptor, this::getPropertyValue));
+
+ }
+
+ @Override
+ public String getSchedulingPeriod() {
+ return "1";
+ }
+
+ @Override
+ public Long getSchedulingPeriod(TimeUnit timeUnit) {
+ return 1L;
+ }
+
+ @Override
+ public native String getName();
+
+ public native String getPropertyValue(final String propertyName);
+
+ @Override
+ public PropertyValue getProperty(PropertyDescriptor descriptor) {
+ String value = getPropertyValue(descriptor.getName());
+ if (value == null || "null".equals(value))
+ value = descriptor.getDefaultValue();
+ return new StandardPropertyValue(value,this);
+ }
+
+ @Override
+ public Map<String, String> getAllProperties() {
+ Map<PropertyDescriptor, String> map = getProperties();
+ Map<String,String> newProps = new HashMap<>();
+ map.forEach((x,y) ->
+ {
+ newProps.put(x.getName(),y);
+ });
+ return newProps;
+
+ }
+
+
+ @Override
+ public ControllerService getControllerService(String s) {
+ return null;
+ }
+
+ @Override
+ public boolean isControllerServiceEnabled(String s) {
+ return false;
+ }
+
+ @Override
+ public boolean isControllerServiceEnabling(String s) {
+ return false;
+ }
+
+ @Override
+ public boolean isControllerServiceEnabled(ControllerService controllerService) {
+ return false;
+ }
+
+ @Override
+ public Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> aClass) throws IllegalArgumentException {
+ return null;
+ }
+
+ @Override
+ public String getControllerServiceName(String s) {
+ return null;
+ }
+}
diff --git a/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniControllerServiceLookup.java b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniControllerServiceLookup.java
new file mode 100644
index 0000000..d0b2b81
--- /dev/null
+++ b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniControllerServiceLookup.java
@@ -0,0 +1,35 @@
+package org.apache.nifi.processor;
+
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceLookup;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class JniControllerServiceLookup implements ControllerServiceLookup {
+
+ long nativePtr;
+
+ @Override
+ public native ControllerService getControllerService(String s);
+
+ @Override
+ public native boolean isControllerServiceEnabled(String s);
+
+ @Override
+ public native boolean isControllerServiceEnabling(String s);
+
+ @Override
+ public boolean isControllerServiceEnabled(ControllerService controllerService){
+ return isControllerServiceEnabled(controllerService.getIdentifier());
+ }
+
+ @Override
+ public Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> aClass) throws IllegalArgumentException
+ {
+ return new HashSet<>();
+ }
+
+ @Override
+ public native String getControllerServiceName(String s);
+}
diff --git a/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniInitializationContext.java b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniInitializationContext.java
index 365a92d..343e3bd 100644
--- a/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniInitializationContext.java
+++ b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniInitializationContext.java
@@ -1,13 +1,14 @@
package org.apache.nifi.processor;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.logging.ComponentLog;
import java.io.File;
-public class JniInitializationContext implements ProcessorInitializationContext {
-
+public class JniInitializationContext implements ProcessorInitializationContext, ControllerServiceInitializationContext {
private long nativePtr;
@@ -15,9 +16,7 @@ public class JniInitializationContext implements ProcessorInitializationContext
@Override
- public String getIdentifier() {
- return null;
- }
+ public native String getIdentifier();
/**
@@ -35,11 +34,14 @@ public class JniInitializationContext implements ProcessorInitializationContext
}
@Override
- public ControllerServiceLookup getControllerServiceLookup() {
+ public StateManager getStateManager() {
return null;
}
@Override
+ public native ControllerServiceLookup getControllerServiceLookup();
+
+ @Override
public NodeTypeProvider getNodeTypeProvider() {
return null;
}
diff --git a/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniProcessContext.java b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniProcessContext.java
index c0fbef9..b136a9b 100644
--- a/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniProcessContext.java
+++ b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniProcessContext.java
@@ -5,53 +5,51 @@ import org.apache.nifi.components.AbstractConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-public class JniProcessContext implements ProcessContext, ControllerServiceLookup {
+public class JniProcessContext implements ProcessContext, ControllerServiceLookup{
private long nativePtr;
@Override
public ControllerService getControllerService(String serviceIdentifier) {
- return null;
+ return getControllerServiceLookup().getControllerService(serviceIdentifier);
}
@Override
public boolean isControllerServiceEnabled(String serviceIdentifier) {
- return false;
+ return getControllerServiceLookup().isControllerServiceEnabled(serviceIdentifier);
}
@Override
public boolean isControllerServiceEnabling(String serviceIdentifier) {
- return false;
+ return getControllerServiceLookup().isControllerServiceEnabling(serviceIdentifier);
}
@Override
public boolean isControllerServiceEnabled(ControllerService service) {
- return false;
+ return getControllerServiceLookup().isControllerServiceEnabled(service);
}
@Override
public Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> serviceType) throws IllegalArgumentException {
- return null;
+ return getControllerServiceLookup().getControllerServiceIdentifiers(serviceType);
}
@Override
public String getControllerServiceName(String serviceIdentifier) {
- return null;
+ return getControllerServiceLookup().getControllerServiceName(serviceIdentifier);
}
@Override
public PropertyValue getProperty(String propertyName) {
String value = getPropertyValue(propertyName);
- System.out.println("for " + propertyName + " got " + value);
return new StandardPropertyValue(value,this);
}
@@ -82,20 +80,20 @@ public class JniProcessContext implements ProcessContext, ControllerServiceLooku
@Override
public Map<PropertyDescriptor, String> getProperties() {
List<String> propertyNames = getPropertyNames();
- Processor processor = getProcessor();
- if (processor instanceof AbstractConfigurableComponent) {
- AbstractConfigurableComponent process = AbstractConfigurableComponent.class.cast(getProcessor());
+ AbstractConfigurableComponent process = getComponent();
+
+
if (process != null) {
return propertyNames.stream().collect(Collectors.toMap(process::getPropertyDescriptor, this::getPropertyValue));
}
- }
+
return null;
}
private native List<String> getPropertyNames();
- private native Processor getProcessor();
+ private native AbstractConfigurableComponent getComponent();
@Override
public String encrypt(String unencrypted) {
@@ -108,13 +106,11 @@ public class JniProcessContext implements ProcessContext, ControllerServiceLooku
}
@Override
- public ControllerServiceLookup getControllerServiceLookup() {
- return null;
- }
+ public native ControllerServiceLookup getControllerServiceLookup();
@Override
public Set<Relationship> getAvailableRelationships() {
- return null;
+ return new HashSet<>();
}
@Override
@@ -134,7 +130,7 @@ public class JniProcessContext implements ProcessContext, ControllerServiceLooku
@Override
public boolean isExpressionLanguagePresent(PropertyDescriptor property) {
- return false;
+ return property.isExpressionLanguageSupported();
}
@Override
@@ -143,9 +139,7 @@ public class JniProcessContext implements ProcessContext, ControllerServiceLooku
}
@Override
- public String getName() {
- return null;
- }
+ public native String getName();
@Override
public PropertyValue getProperty(PropertyDescriptor descriptor) {
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 5818d88..8a5c7d3 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -206,7 +206,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
* @param id service identifier
* @param firstTimeAdded first time this CS was added
*/
- virtual std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &type, const std::string &id, bool firstTimeAdded);
+ virtual std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &type, const std::string &fullType, const std::string &id, bool firstTimeAdded);
/**
* controller service provider
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index df5978e..2b560a5 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -88,7 +88,8 @@ class FlowConfiguration : public CoreComponent {
std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name, utils::Identifier & uuid, int version);
- std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &name, utils::Identifier & uuid);
+ std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &full_class_name, const std::string &name,
+ utils::Identifier & uuid);
// Create Remote Processor Group
std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(std::string name, utils::Identifier & uuid);
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h
index 651bceb..18ac5de 100644
--- a/libminifi/include/core/controller/ControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -71,8 +71,9 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
* @param id controller service identifier.
* @return shared pointer to the controller service node.
*/
- virtual std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &id,
+ virtual std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type,const std::string &longType, const std::string &id,
bool firstTimeAdded) = 0;
+
/**
* Gets a controller service node wrapping the controller service
*
diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h
index 678f3f5..cc1d51e 100644
--- a/libminifi/include/core/controller/StandardControllerServiceProvider.h
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -79,12 +79,19 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
agent_ = agent;
}
- std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &id, bool firstTimeAdded) {
+ std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &fullType, const std::string &id, bool firstTimeAdded) {
std::shared_ptr<ControllerService> new_controller_service = extension_loader_.instantiate<ControllerService>(type, id);
if (nullptr == new_controller_service) {
- return nullptr;
+
+ new_controller_service = extension_loader_.instantiate<ControllerService>("ExecuteJavaControllerService", id);
+ if (new_controller_service != nullptr) {
+ new_controller_service->initialize();
+ new_controller_service->setProperty("NiFi Controller Service", fullType);
+ } else {
+ return nullptr;
+ }
}
std::shared_ptr<ControllerServiceNode> new_service_node = std::make_shared<StandardControllerServiceNode>(new_controller_service,
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 21beceb..0e55f67 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -740,8 +740,8 @@ void FlowController::loadC2ResponseConfiguration() {
* @param id service identifier
* @param firstTimeAdded first time this CS was added
*/
-std::shared_ptr<core::controller::ControllerServiceNode> FlowController::createControllerService(const std::string &type, const std::string &id, bool firstTimeAdded) {
- return controller_service_provider_->createControllerService(type, id, firstTimeAdded);
+std::shared_ptr<core::controller::ControllerServiceNode> FlowController::createControllerService(const std::string &type, const std::string &fullType, const std::string &id, bool firstTimeAdded) {
+ return controller_service_provider_->createControllerService(type, fullType, id, firstTimeAdded);
}
/**
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index 0325e30..ec1348d 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -146,8 +146,9 @@ std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(std::str
return std::make_shared<minifi::Connection>(flow_file_repo_, content_repo_, name, uuid);
}
-std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &name, utils::Identifier & uuid) {
- std::shared_ptr<core::controller::ControllerServiceNode> controllerServicesNode = service_provider_->createControllerService(class_name, name, true);
+std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &full_class_name, const std::string &name,
+ utils::Identifier & uuid) {
+ std::shared_ptr<core::controller::ControllerServiceNode> controllerServicesNode = service_provider_->createControllerService(class_name, full_class_name, name, true);
if (nullptr != controllerServicesNode)
controllerServicesNode->setUUID(uuid);
return controllerServicesNode;
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index cbbc6e5..5ebc74e 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -462,7 +462,7 @@ void YamlConfiguration::parseControllerServices(YAML::Node *controllerServicesNo
type = controllerServiceNode["type"].as<std::string>();
logger_->log_debug("Using type %s for controller service node", type);
}
-
+ std::string fullType = type;
auto lastOfIdx = type.find_last_of(".");
if (lastOfIdx != std::string::npos) {
lastOfIdx++; // if a value is found, increment to move beyond the .
@@ -475,7 +475,7 @@ void YamlConfiguration::parseControllerServices(YAML::Node *controllerServicesNo
utils::Identifier uuid;
uuid = id;
- auto controller_service_node = createControllerService(type, name, uuid);
+ auto controller_service_node = createControllerService(type, fullType, name, uuid);
if (nullptr != controller_service_node) {
logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name);
controller_service_node->initialize();