You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/03/28 17:19:09 UTC
[04/16] nifi-minifi-cpp git commit: MINIFI-217: Updates namespaces
and removes use of raw pointers for user facing API.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Repository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp
new file mode 100644
index 0000000..9a27785
--- /dev/null
+++ b/libminifi/src/core/Repository.cpp
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <cstdint>
+#include <vector>
+#include <arpa/inet.h>
+#include "io/DataStream.h"
+#include "io/Serializable.h"
+#include "core/Relationship.h"
+#include "core/logging/Logger.h"
+#include "FlowController.h"
+#include "core/Repository.h"
+#include "provenance/Provenance.h"
+#include "core/repository/FlowFileRepository.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+void Repository::start() {
+ if (this->purge_period_ <= 0)
+ return;
+ if (running_)
+ return;
+ thread_ = std::thread(&Repository::threadExecutor, this);
+ thread_.detach();
+ running_ = true;
+ logger_->log_info("%s Repository Monitor Thread Start", name_.c_str());
+}
+
+void Repository::stop() {
+ if (!running_)
+ return;
+ running_ = false;
+ if (thread_.joinable())
+ thread_.join();
+ logger_->log_info("%s Repository Monitor Thread Stop", name_.c_str());
+}
+
+// repoSize
+uint64_t Repository::repoSize() {
+ return repo_size_;
+}
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/RepositoryFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp
new file mode 100644
index 0000000..ef0b9ef
--- /dev/null
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -0,0 +1,69 @@
+#include "core/Repository.h"
+#include "core/Repository.h"
+
+#ifdef LEVELDB_SUPPORT
+#include "core/repository/FlowFileRepository.h"
+#include "provenance/ProvenanceRepository.h"
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+#ifndef LEVELDB_SUPPORT
+ namespace provenance{
+ class ProvenanceRepository;
+ }
+#endif
+namespace core {
+
+#ifndef LEVELDB_SUPPORT
+ class FlowFileRepository;
+#endif
+
+
+ std::shared_ptr<core::Repository> createRepository(
+ const std::string configuration_class_name, bool fail_safe = false) {
+
+ std::string class_name_lc = configuration_class_name;
+ std::transform(class_name_lc.begin(), class_name_lc.end(),
+ class_name_lc.begin(), ::tolower);
+ try {
+ std::shared_ptr<core::Repository> return_obj = nullptr;
+ if (class_name_lc == "flowfilerepository") {
+
+ return_obj = std::shared_ptr<core::Repository>((core::Repository*)instantiate<core::repository::FlowFileRepository>());
+ } else if (class_name_lc == "provenancerepository") {
+
+
+ return_obj = std::shared_ptr<core::Repository>((core::Repository*)instantiate<provenance::ProvenanceRepository>());
+
+ }
+
+ if (return_obj){
+ return return_obj;
+ }
+ if (fail_safe) {
+ return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1,
+ 1, 1);
+ } else {
+ throw std::runtime_error(
+ "Support for the provided configuration class could not be found");
+ }
+ } catch (const std::runtime_error &r) {
+ if (fail_safe) {
+ return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1,
+ 1, 1);
+ }
+ }
+
+ throw std::runtime_error(
+ "Support for the provided configuration class could not be found");
+ }
+
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/logging/BaseLogger.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/BaseLogger.cpp b/libminifi/src/core/logging/BaseLogger.cpp
new file mode 100644
index 0000000..a6b43a8
--- /dev/null
+++ b/libminifi/src/core/logging/BaseLogger.cpp
@@ -0,0 +1,161 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/BaseLogger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+
+// Logger related configuration items.
+const char *BaseLogger::nifi_log_level = "nifi.log.level";
+const char *BaseLogger::nifi_log_appender = "nifi.log.appender";
+
+/**
+ * @brief Log error message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+void BaseLogger::log_error(const char * const format, ...) {
+ if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::err))
+ return;
+ FILL_BUFFER
+ log_str(err, buffer);
+}
+/**
+ * @brief Log warn message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+void BaseLogger::log_warn(const char * const format, ...) {
+ if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::warn))
+ return;
+ FILL_BUFFER
+ log_str(warn, buffer);
+}
+/**
+ * @brief Log info message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+void BaseLogger::log_info(const char * const format, ...) {
+ if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::info))
+ return;
+ FILL_BUFFER
+ log_str(info, buffer);
+}
+/**
+ * @brief Log debug message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+void BaseLogger::log_debug(const char * const format, ...) {
+
+ if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::debug))
+ return;
+ FILL_BUFFER
+ log_str(debug, buffer);
+}
+/**
+ * @brief Log trace message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+void BaseLogger::log_trace(const char * const format, ...) {
+
+ if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::trace))
+ return;
+ FILL_BUFFER
+ log_str(debug, buffer);
+}
+
+// overridables
+
+/**
+ * @brief Log error message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+void BaseLogger::log_str(LOG_LEVEL_E level, const std::string &buffer) {
+ switch (level) {
+ case err:
+ case critical:
+ if (stderr_ != nullptr) {
+ stderr_->error(buffer);
+ } else {
+ logger_->error(buffer);
+ }
+ break;
+ case warn:
+ logger_->warn(buffer);
+ break;
+ case info:
+ logger_->info(buffer);
+ break;
+ case debug:
+ logger_->debug(buffer);
+ break;
+ case trace:
+ logger_->trace(buffer);
+ break;
+ case off:
+ break;
+ default:
+ logger_->info(buffer);
+ break;
+ }
+
+}
+
+void BaseLogger::setLogLevel(const std::string &level,
+ LOG_LEVEL_E defaultLevel) {
+ std::string logLevel = level;
+ std::transform(logLevel.begin(), logLevel.end(), logLevel.begin(), ::tolower);
+
+ if (logLevel == "trace") {
+ setLogLevel(trace);
+ } else if (logLevel == "debug") {
+ setLogLevel(debug);
+ } else if (logLevel == "info") {
+ setLogLevel(info);
+ } else if (logLevel == "warn") {
+ setLogLevel(warn);
+ } else if (logLevel == "error") {
+ setLogLevel(err);
+ } else if (logLevel == "critical") {
+ setLogLevel(critical);
+ } else if (logLevel == "off") {
+ setLogLevel(off);
+ } else {
+ setLogLevel(defaultLevel);
+ }
+}
+
+void BaseLogger::set_error_logger(std::shared_ptr<spdlog::logger> other) {
+ stderr_ = std::move(other);
+}
+
+} /* namespace logging */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/logging/LogAppenders.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/LogAppenders.cpp b/libminifi/src/core/logging/LogAppenders.cpp
new file mode 100644
index 0000000..5d92334
--- /dev/null
+++ b/libminifi/src/core/logging/LogAppenders.cpp
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/LogAppenders.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+
+const char *OutputStreamAppender::nifi_log_output_stream_error_stderr="nifi.log.outputstream.appender.error.stderr";
+
+const char *RollingAppender::nifi_log_rolling_apender_file = "nifi.log.rolling.appender.file";
+const char *RollingAppender::nifi_log_rolling_appender_max_files = "nifi.log.rolling.appender.max.files";
+const char *RollingAppender::nifi_log_rolling_appender_max_file_size = "nifi.log.rolling.appender.max.file_size";
+
+
+} /* namespace logging */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/logging/Logger.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/Logger.cpp b/libminifi/src/core/logging/Logger.cpp
new file mode 100644
index 0000000..d8cadf3
--- /dev/null
+++ b/libminifi/src/core/logging/Logger.cpp
@@ -0,0 +1,40 @@
+/**
+ * @file Logger.cpp
+ * Logger class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/logging/Logger.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+
+std::shared_ptr<Logger> Logger::singleton_logger_(nullptr);
+
+} /* namespace logging */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/repository/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp
new file mode 100644
index 0000000..c495a67
--- /dev/null
+++ b/libminifi/src/core/repository/FlowFileRepository.cpp
@@ -0,0 +1,109 @@
+#include "core/repository/FlowFileRepository.h"
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+void FlowFileRepository::run() {
+ // threshold for purge
+ uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4;
+ while (running_) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
+ uint64_t curTime = getTimeMillis();
+ uint64_t size = repoSize();
+ if (size >= purgeThreshold) {
+ std::vector<std::string> purgeList;
+ leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
+
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this());
+ std::string key = it->key().ToString();
+ if (eventRead->DeSerialize((uint8_t *) it->value().data(),
+ (int) it->value().size())) {
+ if ((curTime - eventRead->getEventTime()) > max_partition_millis_)
+ purgeList.push_back(key);
+ } else {
+ logger_->log_debug("NiFi %s retrieve event %s fail", name_.c_str(),
+ key.c_str());
+ purgeList.push_back(key);
+ }
+ }
+ delete it;
+ for (auto eventId : purgeList) {
+ logger_->log_info("Repository Repo %s Purge %s", name_.c_str(),
+ eventId.c_str());
+ Delete(eventId);
+ }
+ }
+ if (size > max_partition_bytes_)
+ repo_full_ = true;
+ else
+ repo_full_ = false;
+ }
+ return;
+}
+
+void FlowFileRepository::loadFlowFileToConnections(std::map<std::string, std::shared_ptr<minifi::Connection>> &connectionMap)
+ {
+
+ std::vector<std::string> purgeList;
+ leveldb::Iterator* it = db_->NewIterator(
+ leveldb::ReadOptions());
+
+ for (it->SeekToFirst(); it->Valid(); it->Next())
+ {
+ std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this());
+ std::string key = it->key().ToString();
+ if (eventRead->DeSerialize((uint8_t *) it->value().data(),
+ (int) it->value().size()))
+ {
+ auto search = connectionMap.find(eventRead->getConnectionUuid());
+ if (search != connectionMap.end())
+ {
+ // we find the connection for the persistent flowfile, create the flowfile and enqueue that
+ std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
+ std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(),flow_file_ref);
+ // set store to repo to true so that we do need to persistent again in enqueue
+ record->setStoredToRepository(true);
+ search->second->put(record);
+ }
+ else
+ {
+ if (eventRead->getContentFullPath().length() > 0)
+ {
+ std::remove(eventRead->getContentFullPath().c_str());
+ }
+ purgeList.push_back(key);
+ }
+ }
+ else
+ {
+ purgeList.push_back(key);
+ }
+ }
+
+ delete it;
+ std::vector<std::string>::iterator itPurge;
+ for (itPurge = purgeList.begin(); itPurge != purgeList.end();
+ itPurge++)
+ {
+ std::string eventId = *itPurge;
+ logger_->log_info("Repository Repo %s Purge %s",
+ name_.c_str(),
+ eventId.c_str());
+ Delete(eventId);
+ }
+
+ return;
+}
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
new file mode 100644
index 0000000..8e59363
--- /dev/null
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -0,0 +1,490 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(
+ YAML::Node rootFlowNode) {
+ uuid_t uuid;
+
+ std::string flowName = rootFlowNode["name"].as<std::string>();
+ std::string id ;
+
+ try {
+ rootFlowNode["id"].as<std::string>();
+
+ uuid_parse(id.c_str(), uuid);
+ }catch(...)
+ {
+ logger_->log_warn("Generating random ID for root node");
+ uuid_generate(uuid);
+ char uuid_str[37];
+ uuid_unparse(uuid,uuid_str);
+ id = uuid_str;
+ }
+
+ logger_->log_debug("parseRootProcessGroup: id => [%s]", id.c_str());
+ logger_->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str());
+ std::unique_ptr<core::ProcessGroup> group =
+ FlowConfiguration::createRootProcessGroup(flowName, uuid);
+
+ this->name_ = flowName;
+
+ return group.release();
+}
+
+void YamlConfiguration::parseProcessorNodeYaml(
+ YAML::Node processorsNode, core::ProcessGroup * parentGroup) {
+ int64_t schedulingPeriod = -1;
+ int64_t penalizationPeriod = -1;
+ int64_t yieldPeriod = -1;
+ int64_t runDurationNanos = -1;
+ uuid_t uuid;
+ std::shared_ptr<core::Processor> processor = nullptr;
+
+ if (!parentGroup) {
+ logger_->log_error("parseProcessNodeYaml: no parent group exists");
+ return;
+ }
+
+ if (processorsNode) {
+
+ if (processorsNode.IsSequence()) {
+ // Evaluate sequence of processors
+ int numProcessors = processorsNode.size();
+
+ for (YAML::const_iterator iter = processorsNode.begin();
+ iter != processorsNode.end(); ++iter) {
+ core::ProcessorConfig procCfg;
+ YAML::Node procNode = iter->as<YAML::Node>();
+
+ procCfg.name = procNode["name"].as<std::string>();
+ procCfg.id = procNode["id"].as<std::string>();
+ logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]",
+ procCfg.name.c_str(), procCfg.id.c_str());
+ procCfg.javaClass = procNode["class"].as<std::string>();
+ logger_->log_debug("parseProcessorNode: class => [%s]",
+ procCfg.javaClass.c_str());
+
+ uuid_parse(procCfg.id.c_str(), uuid);
+
+ // Determine the processor name only from the Java class
+ int lastOfIdx = procCfg.javaClass.find_last_of(".");
+ if (lastOfIdx != std::string::npos) {
+ lastOfIdx++; // if a value is found, increment to move beyond the .
+ int nameLength = procCfg.javaClass.length() - lastOfIdx;
+ std::string processorName = procCfg.javaClass.substr(lastOfIdx,
+ nameLength);
+ processor = this->createProcessor(processorName, uuid);
+ }
+
+ if (!processor) {
+ logger_->log_error("Could not create a processor %s with name %s",
+ procCfg.name.c_str(), procCfg.id.c_str());
+ throw std::invalid_argument(
+ "Could not create processor " + procCfg.name);
+ }
+ processor->setName(procCfg.name);
+
+ procCfg.maxConcurrentTasks = procNode["max concurrent tasks"]
+ .as<std::string>();
+ logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]",
+ procCfg.maxConcurrentTasks.c_str());
+ procCfg.schedulingStrategy = procNode["scheduling strategy"]
+ .as<std::string>();
+ logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]",
+ procCfg.schedulingStrategy.c_str());
+ procCfg.schedulingPeriod =
+ procNode["scheduling period"].as<std::string>();
+ logger_->log_debug("parseProcessorNode: scheduling period => [%s]",
+ procCfg.schedulingPeriod.c_str());
+ procCfg.penalizationPeriod = procNode["penalization period"]
+ .as<std::string>();
+ logger_->log_debug("parseProcessorNode: penalization period => [%s]",
+ procCfg.penalizationPeriod.c_str());
+ procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
+ logger_->log_debug("parseProcessorNode: yield period => [%s]",
+ procCfg.yieldPeriod.c_str());
+ procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>();
+ logger_->log_debug("parseProcessorNode: run duration nanos => [%s]",
+ procCfg.runDurationNanos.c_str());
+
+ // handle auto-terminated relationships
+ YAML::Node autoTerminatedSequence =
+ procNode["auto-terminated relationships list"];
+ std::vector<std::string> rawAutoTerminatedRelationshipValues;
+ if (autoTerminatedSequence.IsSequence()
+ && !autoTerminatedSequence.IsNull()
+ && autoTerminatedSequence.size() > 0) {
+ for (YAML::const_iterator relIter = autoTerminatedSequence.begin();
+ relIter != autoTerminatedSequence.end(); ++relIter) {
+ std::string autoTerminatedRel = relIter->as<std::string>();
+ rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
+ }
+ }
+ procCfg.autoTerminatedRelationships =
+ rawAutoTerminatedRelationshipValues;
+
+ // handle processor properties
+ YAML::Node propertiesNode = procNode["Properties"];
+ parsePropertiesNodeYaml(&propertiesNode, processor);
+
+ // Take care of scheduling
+ core::TimeUnit unit;
+ if (core::Property::StringToTime(procCfg.schedulingPeriod,
+ schedulingPeriod, unit)
+ && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit,
+ schedulingPeriod)) {
+ logger_->log_debug(
+ "convert: parseProcessorNode: schedulingPeriod => [%d] ns",
+ schedulingPeriod);
+ processor->setSchedulingPeriodNano(schedulingPeriod);
+ }
+
+ if (core::Property::StringToTime(procCfg.penalizationPeriod,
+ penalizationPeriod, unit)
+ && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit,
+ penalizationPeriod)) {
+ logger_->log_debug(
+ "convert: parseProcessorNode: penalizationPeriod => [%d] ms",
+ penalizationPeriod);
+ processor->setPenalizationPeriodMsec(penalizationPeriod);
+ }
+
+ if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit)
+ && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit,
+ yieldPeriod)) {
+ logger_->log_debug(
+ "convert: parseProcessorNode: yieldPeriod => [%d] ms",
+ yieldPeriod);
+ processor->setYieldPeriodMsec(yieldPeriod);
+ }
+
+ // Default to running
+ processor->setScheduledState(core::RUNNING);
+
+ if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
+ processor->setSchedulingStrategy(core::TIMER_DRIVEN);
+ logger_->log_debug("setting scheduling strategy as %s",
+ procCfg.schedulingStrategy.c_str());
+ } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
+ processor->setSchedulingStrategy(core::EVENT_DRIVEN);
+ logger_->log_debug("setting scheduling strategy as %s",
+ procCfg.schedulingStrategy.c_str());
+ } else {
+ processor->setSchedulingStrategy(core::CRON_DRIVEN);
+ logger_->log_debug("setting scheduling strategy as %s",
+ procCfg.schedulingStrategy.c_str());
+
+ }
+
+ int64_t maxConcurrentTasks;
+ if (core::Property::StringToInt(procCfg.maxConcurrentTasks,
+ maxConcurrentTasks)) {
+ logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]",
+ maxConcurrentTasks);
+ processor->setMaxConcurrentTasks(maxConcurrentTasks);
+ }
+
+ if (core::Property::StringToInt(procCfg.runDurationNanos,
+ runDurationNanos)) {
+ logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]",
+ runDurationNanos);
+ processor->setRunDurationNano(runDurationNanos);
+ }
+
+ std::set<core::Relationship> autoTerminatedRelationships;
+ for (auto &&relString : procCfg.autoTerminatedRelationships) {
+ core::Relationship relationship(relString, "");
+ logger_->log_debug(
+ "parseProcessorNode: autoTerminatedRelationship => [%s]",
+ relString.c_str());
+ autoTerminatedRelationships.insert(relationship);
+ }
+
+ processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+ parentGroup->addProcessor(processor);
+ }
+ }
+ } else {
+ throw new std::invalid_argument(
+ "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
+ }
+}
+
+void YamlConfiguration::parseRemoteProcessGroupYaml(
+ YAML::Node *rpgNode, core::ProcessGroup * parentGroup) {
+ uuid_t uuid;
+
+ if (!parentGroup) {
+ logger_->log_error("parseRemoteProcessGroupYaml: no parent group exists");
+ return;
+ }
+
+ if (rpgNode) {
+ if (rpgNode->IsSequence()) {
+ for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end();
+ ++iter) {
+ YAML::Node rpgNode = iter->as<YAML::Node>();
+
+ auto name = rpgNode["name"].as<std::string>();
+ auto id = rpgNode["id"].as<std::string>();
+
+ logger_->log_debug(
+ "parseRemoteProcessGroupYaml: name => [%s], id => [%s]",
+ name.c_str(), id.c_str());
+
+ std::string url = rpgNode["url"].as<std::string>();
+ logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]",
+ url.c_str());
+
+ std::string timeout = rpgNode["timeout"].as<std::string>();
+ logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]",
+ timeout.c_str());
+
+ std::string yieldPeriod = rpgNode["yield period"].as<std::string>();
+ logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]",
+ yieldPeriod.c_str());
+
+ YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>();
+ YAML::Node outputPorts = rpgNode["Output Ports"].as<YAML::Node>();
+ core::ProcessGroup *group = NULL;
+
+ uuid_parse(id.c_str(), uuid);
+
+ int64_t timeoutValue = -1;
+ int64_t yieldPeriodValue = -1;
+
+ group = this->createRemoteProcessGroup(name.c_str(), uuid).release();
+ group->setParent(parentGroup);
+ parentGroup->addProcessGroup(group);
+
+ core::TimeUnit unit;
+
+ if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
+ && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit,
+ yieldPeriodValue) && group) {
+ logger_->log_debug(
+ "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms",
+ yieldPeriodValue);
+ group->setYieldPeriodMsec(yieldPeriodValue);
+ }
+
+ if (core::Property::StringToTime(timeout, timeoutValue, unit)
+ && core::Property::ConvertTimeUnitToMS(timeoutValue, unit,
+ timeoutValue) && group) {
+ logger_->log_debug(
+ "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms",
+ timeoutValue);
+ group->setTimeOut(timeoutValue);
+ }
+
+ group->setTransmitting(true);
+ group->setURL(url);
+
+ if (inputPorts && inputPorts.IsSequence()) {
+ for (YAML::const_iterator portIter = inputPorts.begin();
+ portIter != inputPorts.end(); ++portIter) {
+ logger_->log_debug("Got a current port, iterating...");
+
+ YAML::Node currPort = portIter->as<YAML::Node>();
+
+ this->parsePortYaml(&currPort, group, SEND);
+ } // for node
+ }
+ if (outputPorts && outputPorts.IsSequence()) {
+ for (YAML::const_iterator portIter = outputPorts.begin();
+ portIter != outputPorts.end(); ++portIter) {
+ logger_->log_debug("Got a current port, iterating...");
+
+ YAML::Node currPort = portIter->as<YAML::Node>();
+
+ this->parsePortYaml(&currPort, group, RECEIVE);
+ } // for node
+ }
+
+ }
+ }
+ }
+}
+
+void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
+ core::ProcessGroup *parent) {
+ uuid_t uuid;
+ std::shared_ptr<minifi::Connection> connection = nullptr;
+
+ if (!parent) {
+ logger_->log_error("parseProcessNode: no parent group was provided");
+ return;
+ }
+
+ if (connectionsNode) {
+
+ if (connectionsNode->IsSequence()) {
+ for (YAML::const_iterator iter = connectionsNode->begin();
+ iter != connectionsNode->end(); ++iter) {
+
+ YAML::Node connectionNode = iter->as<YAML::Node>();
+
+ std::string name = connectionNode["name"].as<std::string>();
+ std::string id = connectionNode["id"].as<std::string>();
+ std::string destId = connectionNode["destination id"].as<std::string>();
+
+ uuid_parse(id.c_str(), uuid);
+
+ logger_->log_debug("Created connection with UUID %s and name %s",
+ id.c_str(), name.c_str());
+ connection = this->createConnection(name, uuid);
+ auto rawRelationship = connectionNode["source relationship name"]
+ .as<std::string>();
+ core::Relationship relationship(rawRelationship, "");
+ logger_->log_debug("parseConnection: relationship => [%s]",
+ rawRelationship.c_str());
+ if (connection)
+ connection->setRelationship(relationship);
+ std::string connectionSrcProcId = connectionNode["source id"]
+ .as<std::string>();
+ uuid_t srcUUID;
+ uuid_parse(connectionSrcProcId.c_str(), srcUUID);
+
+ auto srcProcessor = parent->findProcessor(srcUUID);
+
+ if (!srcProcessor) {
+ logger_->log_error(
+ "Could not locate a source with id %s to create a connection",
+ connectionSrcProcId.c_str());
+ throw std::invalid_argument(
+ "Could not locate a source with id %s to create a connection "
+ + connectionSrcProcId);
+ }
+
+ uuid_t destUUID;
+ uuid_parse(destId.c_str(), destUUID);
+ auto destProcessor = parent->findProcessor(destUUID);
+ // If we could not find name, try by UUID
+ if (!destProcessor) {
+ uuid_t destUuid;
+ uuid_parse(destId.c_str(), destUuid);
+ destProcessor = parent->findProcessor(destUuid);
+ }
+ if (destProcessor) {
+ std::string destUuid = destProcessor->getUUIDStr();
+ }
+
+ uuid_t srcUuid;
+ uuid_t destUuid;
+ srcProcessor->getUUID(srcUuid);
+ connection->setSourceUUID(srcUuid);
+ destProcessor->getUUID(destUuid);
+ connection->setDestinationUUID(destUuid);
+
+ if (connection) {
+ parent->addConnection(connection);
+ }
+ }
+ }
+
+ if (connection)
+ parent->addConnection(connection);
+
+ return;
+ }
+}
+
+void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
+ core::ProcessGroup *parent,
+ TransferDirection direction) {
+ uuid_t uuid;
+ std::shared_ptr<core::Processor> processor = NULL;
+ minifi::RemoteProcessorGroupPort *port = NULL;
+
+ if (!parent) {
+ logger_->log_error("parseProcessNode: no parent group existed");
+ return;
+ }
+
+ YAML::Node inputPortsObj = portNode->as<YAML::Node>();
+
+ // generate the random UIID
+ uuid_generate(uuid);
+
+ auto portId = inputPortsObj["id"].as<std::string>();
+ auto nameStr = inputPortsObj["name"].as<std::string>();
+ uuid_parse(portId.c_str(), uuid);
+
+ port = new minifi::RemoteProcessorGroupPort(nameStr.c_str(), uuid);
+
+ processor = (std::shared_ptr<core::Processor>) port;
+ port->setDirection(direction);
+ port->setTimeOut(parent->getTimeOut());
+ port->setTransmitting(true);
+ processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
+ processor->initialize();
+
+ // handle port properties
+ YAML::Node nodeVal = portNode->as<YAML::Node>();
+ YAML::Node propertiesNode = nodeVal["Properties"];
+
+ parsePropertiesNodeYaml(&propertiesNode, processor);
+
+ // add processor to parent
+ parent->addProcessor(processor);
+ processor->setScheduledState(core::RUNNING);
+ auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"]
+ .as<std::string>();
+ int64_t maxConcurrentTasks;
+ if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
+ processor->setMaxConcurrentTasks(maxConcurrentTasks);
+ }
+ logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]",
+ maxConcurrentTasks);
+ processor->setMaxConcurrentTasks(maxConcurrentTasks);
+
+}
+
+void YamlConfiguration::parsePropertiesNodeYaml(
+ YAML::Node *propertiesNode, std::shared_ptr<core::Processor> processor) {
+ // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated
+ for (YAML::const_iterator propsIter = propertiesNode->begin();
+ propsIter != propertiesNode->end(); ++propsIter) {
+ std::string propertyName = propsIter->first.as<std::string>();
+ YAML::Node propertyValueNode = propsIter->second;
+ if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) {
+ std::string rawValueString = propertyValueNode.as<std::string>();
+ if (!processor->setProperty(propertyName, rawValueString)) {
+ logger_->log_warn(
+ "Received property %s with value %s but is not one of the properties for %s",
+ propertyName.c_str(), rawValueString.c_str(),
+ processor->getName().c_str());
+ }
+ }
+ }
+}
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/BaseStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/BaseStream.cpp b/libminifi/src/io/BaseStream.cpp
index cf3fe45..1400a1d 100644
--- a/libminifi/src/io/BaseStream.cpp
+++ b/libminifi/src/io/BaseStream.cpp
@@ -18,6 +18,12 @@
#include "io/BaseStream.h"
#include "io/Serializable.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
/**
* write 4 bytes to stream
* @param base_value non encoded value
@@ -26,7 +32,7 @@
* @return resulting write size
**/
int BaseStream::write(uint32_t base_value, bool is_little_endian) {
- return ::Serializable::write(base_value, (DataStream*) this, is_little_endian);
+ return Serializable::write(base_value, (DataStream*) this, is_little_endian);
}
/**
@@ -37,7 +43,7 @@ int BaseStream::write(uint32_t base_value, bool is_little_endian) {
* @return resulting write size
**/
int BaseStream::write(uint16_t base_value, bool is_little_endian) {
- return ::Serializable::write(base_value, (DataStream*) this, is_little_endian);
+ return Serializable::write(base_value, (DataStream*) this, is_little_endian);
}
/**
@@ -48,7 +54,7 @@ int BaseStream::write(uint16_t base_value, bool is_little_endian) {
* @return resulting write size
**/
int BaseStream::write(uint8_t *value, int len) {
- return ::Serializable::write(value, len, (DataStream*) this);
+ return Serializable::write(value, len, (DataStream*) this);
}
/**
@@ -59,7 +65,7 @@ int BaseStream::write(uint8_t *value, int len) {
* @return resulting write size
**/
int BaseStream::write(uint64_t base_value, bool is_little_endian) {
- return ::Serializable::write(base_value, (DataStream*) this, is_little_endian);
+ return Serializable::write(base_value, (DataStream*) this, is_little_endian);
}
/**
@@ -69,7 +75,7 @@ int BaseStream::write(uint64_t base_value, bool is_little_endian) {
**/
int BaseStream::write(bool value) {
uint8_t v = value;
- return ::Serializable::write(v);
+ return Serializable::write(v);
}
/**
@@ -78,7 +84,7 @@ int BaseStream::write(bool value) {
* @return resulting write size
**/
int BaseStream::writeUTF(std::string str, bool widen) {
- return ::Serializable::writeUTF(str, (DataStream*) this, widen);
+ return Serializable::writeUTF(str, (DataStream*) this, widen);
}
/**
@@ -88,7 +94,7 @@ int BaseStream::writeUTF(std::string str, bool widen) {
* @return resulting read size
**/
int BaseStream::read(uint8_t &value) {
- return ::Serializable::read(value, (DataStream*) this);
+ return Serializable::read(value, (DataStream*) this);
}
/**
@@ -98,7 +104,7 @@ int BaseStream::read(uint8_t &value) {
* @return resulting read size
**/
int BaseStream::read(uint16_t &base_value, bool is_little_endian) {
- return ::Serializable::read(base_value, (DataStream*) this);
+ return Serializable::read(base_value, (DataStream*) this);
}
/**
@@ -108,7 +114,7 @@ int BaseStream::read(uint16_t &base_value, bool is_little_endian) {
* @return resulting read size
**/
int BaseStream::read(char &value) {
- return ::Serializable::read(value, (DataStream*) this);
+ return Serializable::read(value, (DataStream*) this);
}
/**
@@ -119,7 +125,7 @@ int BaseStream::read(char &value) {
* @return resulting read size
**/
int BaseStream::read(uint8_t *value, int len) {
- return ::Serializable::read(value, len, (DataStream*) this);
+ return Serializable::read(value, len, (DataStream*) this);
}
/**
@@ -129,7 +135,7 @@ int BaseStream::read(uint8_t *value, int len) {
* @return resulting read size
**/
int BaseStream::read(uint32_t &value, bool is_little_endian) {
- return ::Serializable::read(value, (DataStream*) this, is_little_endian);
+ return Serializable::read(value, (DataStream*) this, is_little_endian);
}
/**
@@ -139,7 +145,7 @@ int BaseStream::read(uint32_t &value, bool is_little_endian) {
* @return resulting read size
**/
int BaseStream::read(uint64_t &value, bool is_little_endian) {
- return ::Serializable::read(value, (DataStream*) this, is_little_endian);
+ return Serializable::read(value, (DataStream*) this, is_little_endian);
}
/**
@@ -149,5 +155,12 @@ int BaseStream::read(uint64_t &value, bool is_little_endian) {
* @return resulting read size
**/
int BaseStream::readUTF(std::string &str, bool widen) {
- return ::Serializable::readUTF(str, (DataStream*) this, widen);
+ return Serializable::readUTF(str, (DataStream*) this, widen);
}
+
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index 39b71b4..ad6b04d 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -30,425 +30,424 @@
#include "io/validation.h"
#include "io/ClientSocket.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
std::string Socket::HOSTNAME = Socket::getMyHostName(0);
-
Socket::Socket(const std::string &hostname, const uint16_t port,
- const uint16_t listeners = -1) :
- requested_hostname_(hostname), port_(port), addr_info_(0), socket_file_descriptor_(
- -1), socket_max_(0), listeners_(listeners), canonical_hostname_("") {
- logger_ = Logger::getLogger();
- FD_ZERO(&total_list_);
- FD_ZERO(&read_fds_);
+ const uint16_t listeners = -1)
+ : requested_hostname_(hostname),
+ port_(port),
+ addr_info_(0),
+ socket_file_descriptor_(-1),
+ socket_max_(0),
+ listeners_(listeners),
+ canonical_hostname_("") {
+ logger_ = logging::Logger::getLogger();
+ FD_ZERO(&total_list_);
+ FD_ZERO(&read_fds_);
}
-Socket::Socket(const std::string &hostname, const uint16_t port) :
- ::Socket(hostname, port, 0) {
+Socket::Socket(const std::string &hostname, const uint16_t port)
+ : Socket(hostname, port, 0) {
}
-Socket::Socket(const Socket &&other) :
- requested_hostname_(std::move(other.requested_hostname_)), port_(
- std::move(other.port_)), addr_info_(std::move(other.addr_info_)), socket_file_descriptor_(
- other.socket_file_descriptor_), socket_max_(
- other.socket_max_.load()), listeners_(other.listeners_), total_list_(
- other.total_list_), read_fds_(other.read_fds_), canonical_hostname_(
- std::move(other.canonical_hostname_)) {
- logger_ = Logger::getLogger();
+Socket::Socket(const Socket &&other)
+ : requested_hostname_(std::move(other.requested_hostname_)),
+ port_(std::move(other.port_)),
+ addr_info_(std::move(other.addr_info_)),
+ socket_file_descriptor_(other.socket_file_descriptor_),
+ socket_max_(other.socket_max_.load()),
+ listeners_(other.listeners_),
+ total_list_(other.total_list_),
+ read_fds_(other.read_fds_),
+ canonical_hostname_(std::move(other.canonical_hostname_)) {
+ logger_ = logging::Logger::getLogger();
}
Socket::~Socket() {
- closeStream();
+ closeStream();
}
-void Socket::closeStream()
-{
- if (0 != addr_info_) {
- freeaddrinfo(addr_info_);
- addr_info_=0;
- }
+void Socket::closeStream() {
+ if (0 != addr_info_) {
+ freeaddrinfo(addr_info_);
+ addr_info_ = 0;
+ }
- if (socket_file_descriptor_ >= 0)
- {
- close(socket_file_descriptor_);
- socket_file_descriptor_ = -1;
- }
+ if (socket_file_descriptor_ >= 0) {
+ close(socket_file_descriptor_);
+ socket_file_descriptor_ = -1;
+ }
}
-int8_t Socket::createConnection(const addrinfo *p,in_addr_t &addr) {
- if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype,
- p->ai_protocol)) == -1) {
- logger_->log_error("error while connecting to server socket");
- return -1;
- }
-
- setSocketOptions(socket_file_descriptor_);
-
- if (listeners_ > 0) {
-
- struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
- sa_loc->sin_family = AF_INET;
- sa_loc->sin_port = htons(port_);
- sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
-
- if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
- logger_->log_error("Could not bind to socket", strerror(errno));
- return -1;
- }
- }
- {
- if (listeners_ <=0 )
- {
- struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
- sa_loc->sin_family = AF_INET;
- //sa_loc->sin_port = htons(port);
- sa_loc->sin_port = htons(port_);
- // use any address if you are connecting to the local machine for testing
- // otherwise we must use the requested hostname
- if (IsNullOrEmpty(requested_hostname_) || requested_hostname_=="localhost")
- {
- sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
- }
- else
- {
- sa_loc->sin_addr.s_addr = addr;
- }
- if (connect(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
- close(socket_file_descriptor_);
- socket_file_descriptor_ = -1;
- logger_->log_warn("Could not connect to socket, error:%s", strerror(errno));
- return -1;
-
- }
- }
- }
-
- // listen
- if (listeners_ > 0) {
- if (listen(socket_file_descriptor_, listeners_) == -1) {
- logger_->log_warn("attempted connection, saw %s", strerror(errno));
- return -1;
- }
-
- }
- // add the listener to the total set
- FD_SET(socket_file_descriptor_, &total_list_);
- socket_max_ = socket_file_descriptor_;
- return 0;
-}
+int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
+ if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype,
+ p->ai_protocol)) == -1) {
+ logger_->log_error("error while connecting to server socket");
+ return -1;
+ }
-short Socket::initialize() {
+ setSocketOptions(socket_file_descriptor_);
- struct sockaddr_in servAddr;
+ if (listeners_ > 0) {
- addrinfo hints = { sizeof(addrinfo) };
- memset(&hints, 0, sizeof hints); // make sure the struct is empty
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags = AI_CANONNAME;
- if (listeners_ > 0)
- hints.ai_flags |= AI_PASSIVE;
+ struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
+ sa_loc->sin_family = AF_INET;
+ sa_loc->sin_port = htons(port_);
+ sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
- hints.ai_protocol = 0; /* any protocol */
+ if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
+ logger_->log_error("Could not bind to socket", strerror(errno));
+ return -1;
+ }
+ }
+ {
+ if (listeners_ <= 0) {
+ struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
+ sa_loc->sin_family = AF_INET;
+ //sa_loc->sin_port = htons(port);
+ sa_loc->sin_port = htons(port_);
+ // use any address if you are connecting to the local machine for testing
+ // otherwise we must use the requested hostname
+ if (IsNullOrEmpty(requested_hostname_)
+ || requested_hostname_ == "localhost") {
+ sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
+ } else {
+ sa_loc->sin_addr.s_addr = addr;
+ }
+ if (connect(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
+ close(socket_file_descriptor_);
+ socket_file_descriptor_ = -1;
+ logger_->log_warn("Could not connect to socket, error:%s",
+ strerror(errno));
+ return -1;
+
+ }
+ }
+ }
- int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints, &addr_info_);
+ // listen
+ if (listeners_ > 0) {
+ if (listen(socket_file_descriptor_, listeners_) == -1) {
+ logger_->log_warn("attempted connection, saw %s", strerror(errno));
+ return -1;
+ }
- if (errcode != 0) {
- logger_->log_error("Saw error during getaddrinfo, error: %s",strerror(errno));
- return -1;
- }
+ }
+ // add the listener to the total set
+ FD_SET(socket_file_descriptor_, &total_list_);
+ socket_max_ = socket_file_descriptor_;
+ return 0;
+}
- socket_file_descriptor_ = -1;
+short Socket::initialize() {
- in_addr_t addr;
- struct hostent *h;
- #ifdef __MACH__
- h = gethostbyname(requested_hostname_.c_str());
- #else
- const char *host;
- uint16_t port;
+ struct sockaddr_in servAddr;
- host = requested_hostname_.c_str();
- port = port_;
- char buf[1024];
- struct hostent he;
- int hh_errno;
- gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
- #endif
+ addrinfo hints = { sizeof(addrinfo) };
+ memset(&hints, 0, sizeof hints); // make sure the struct is empty
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_CANONNAME;
+ if (listeners_ > 0)
+ hints.ai_flags |= AI_PASSIVE;
- memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+ hints.ai_protocol = 0; /* any protocol */
+ int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints,
+ &addr_info_);
- auto p = addr_info_;
- for (; p != NULL; p = p->ai_next) {
- if (IsNullOrEmpty(canonical_hostname_)) {
- if (!IsNullOrEmpty(p) && !IsNullOrEmpty(p->ai_canonname))
- canonical_hostname_ = p->ai_canonname;
- }
+ if (errcode != 0) {
+ logger_->log_error("Saw error during getaddrinfo, error: %s",
+ strerror(errno));
+ return -1;
+ }
+ socket_file_descriptor_ = -1;
- //we've successfully connected
- if (port_ > 0 && createConnection(p,addr) >= 0)
- {
- return 0;
- break;
- }
- }
+ in_addr_t addr;
+ struct hostent *h;
+#ifdef __MACH__
+ h = gethostbyname(requested_hostname_.c_str());
+#else
+ const char *host;
+ uint16_t port;
+
+ host = requested_hostname_.c_str();
+ port = port_;
+ char buf[1024];
+ struct hostent he;
+ int hh_errno;
+ gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
+#endif
- return -1;
+ memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+
+ auto p = addr_info_;
+ for (; p != NULL; p = p->ai_next) {
+ if (IsNullOrEmpty(canonical_hostname_)) {
+ if (!IsNullOrEmpty(p) && !IsNullOrEmpty(p->ai_canonname))
+ canonical_hostname_ = p->ai_canonname;
+ }
+
+ //we've successfully connected
+ if (port_ > 0 && createConnection(p, addr) >= 0) {
+ return 0;
+ break;
+ }
+ }
+
+ return -1;
}
short Socket::select_descriptor(const uint16_t msec) {
- struct timeval tv;
- int retval;
-
- read_fds_ = total_list_;
-
- tv.tv_sec = msec / 1000;
- tv.tv_usec = (msec % 1000) * 1000;
-
- std::lock_guard<std::recursive_mutex> guard(selection_mutex_);
-
- if (msec > 0)
- retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv);
- else
- retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL);
-
- if (retval < 0) {
- logger_->log_error("Saw error during selection, error:%i %s",retval,strerror(errno));
- return retval;
- }
-
- for (int i = 0; i <= socket_max_; i++) {
- if (FD_ISSET(i, &read_fds_)) {
-
- if (i == socket_file_descriptor_) {
- if (listeners_ > 0)
- {
- struct sockaddr_storage remoteaddr; // client address
- socklen_t addrlen = sizeof remoteaddr;
- int newfd = accept(socket_file_descriptor_,
- (struct sockaddr *) &remoteaddr, &addrlen);
- FD_SET(newfd, &total_list_); // add to master set
- if (newfd > socket_max_) { // keep track of the max
- socket_max_ = newfd;
- }
- return newfd;
-
-
-
- }
- else{
- return socket_file_descriptor_;
- }
- // we have a new connection
- } else {
- // data to be received on i
- return i;
- }
- }
-
- }
-
- return -1;
+ struct timeval tv;
+ int retval;
+
+ read_fds_ = total_list_;
+
+ tv.tv_sec = msec / 1000;
+ tv.tv_usec = (msec % 1000) * 1000;
+
+ std::lock_guard<std::recursive_mutex> guard(selection_mutex_);
+
+ if (msec > 0)
+ retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv);
+ else
+ retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL);
+
+ if (retval < 0) {
+ logger_->log_error("Saw error during selection, error:%i %s", retval,
+ strerror(errno));
+ return retval;
+ }
+
+ for (int i = 0; i <= socket_max_; i++) {
+ if (FD_ISSET(i, &read_fds_)) {
+
+ if (i == socket_file_descriptor_) {
+ if (listeners_ > 0) {
+ struct sockaddr_storage remoteaddr; // client address
+ socklen_t addrlen = sizeof remoteaddr;
+ int newfd = accept(socket_file_descriptor_,
+ (struct sockaddr *) &remoteaddr, &addrlen);
+ FD_SET(newfd, &total_list_); // add to master set
+ if (newfd > socket_max_) { // keep track of the max
+ socket_max_ = newfd;
+ }
+ return newfd;
+
+ } else {
+ return socket_file_descriptor_;
+ }
+ // we have a new connection
+ } else {
+ // data to be received on i
+ return i;
+ }
+ }
+
+ }
+
+ return -1;
}
short Socket::setSocketOptions(const int sock) {
- int opt = 1;
- bool nagle_off = true;
+ int opt = 1;
+ bool nagle_off = true;
#ifndef __MACH__
- if (nagle_off) {
- if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *) &opt, sizeof(opt))
- < 0) {
- logger_->log_error("setsockopt() TCP_NODELAY failed");
- close(sock);
- return -1;
- }
- if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt,
- sizeof(opt)) < 0) {
- logger_->log_error("setsockopt() SO_REUSEADDR failed");
- close(sock);
- return -1;
- }
- }
-
- int sndsize = 256 * 1024;
- if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &sndsize,
- (int) sizeof(sndsize)) < 0) {
- logger_->log_error("setsockopt() SO_SNDBUF failed");
- close(sock);
- return -1;
- }
+ if (nagle_off) {
+ if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *) &opt, sizeof(opt))
+ < 0) {
+ logger_->log_error("setsockopt() TCP_NODELAY failed");
+ close(sock);
+ return -1;
+ }
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt,
+ sizeof(opt)) < 0) {
+ logger_->log_error("setsockopt() SO_REUSEADDR failed");
+ close(sock);
+ return -1;
+ }
+ }
+
+ int sndsize = 256 * 1024;
+ if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &sndsize,
+ (int) sizeof(sndsize)) < 0) {
+ logger_->log_error("setsockopt() SO_SNDBUF failed");
+ close(sock);
+ return -1;
+ }
#else
- if (listeners_ > 0) {
- // lose the pesky "address already in use" error message
- if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt,
- sizeof(opt)) < 0) {
- logger_->log_error("setsockopt() SO_REUSEADDR failed");
- close(sock);
- return -1;
- }
- }
+ if (listeners_ > 0) {
+ // lose the pesky "address already in use" error message
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt, sizeof(opt))
+ < 0) {
+ logger_->log_error("setsockopt() SO_REUSEADDR failed");
+ close(sock);
+ return -1;
+ }
+ }
#endif
- return 0;
+ return 0;
}
std::string Socket::getHostname() const {
- return canonical_hostname_;
+ return canonical_hostname_;
}
int Socket::writeData(std::vector<uint8_t> &buf, int buflen) {
- if (buf.capacity() < buflen)
- return -1;
- return writeData((uint8_t*) &buf[0], buflen);
+ if (buf.capacity() < buflen)
+ return -1;
+ return writeData((uint8_t*) &buf[0], buflen);
}
// data stream overrides
int Socket::writeData(uint8_t *value, int size) {
- int ret = 0, bytes = 0;
+ int ret = 0, bytes = 0;
- while (bytes < size) {
+ while (bytes < size) {
+ ret = send(socket_file_descriptor_, value + bytes, size - bytes, 0);
+ //check for errors
+ if (ret <= 0) {
+ close(socket_file_descriptor_);
+ logger_->log_error("Could not send to %d, error: %s",
+ socket_file_descriptor_, strerror(errno));
+ return ret;
+ }
+ bytes += ret;
+ }
- ret = send(socket_file_descriptor_, value + bytes, size - bytes, 0);
- //check for errors
- if (ret <= 0) {
- close(socket_file_descriptor_);
- logger_->log_error("Could not send to %d, error: %s",socket_file_descriptor_, strerror(errno));
- return ret;
- }
- bytes += ret;
-
- }
-
- if (ret)
- logger_->log_trace("Send data size %d over socket %d", size,
- socket_file_descriptor_);
+ if (ret)
+ logger_->log_trace("Send data size %d over socket %d", size,
+ socket_file_descriptor_);
- return bytes;
+ return bytes;
}
template<typename T>
inline std::vector<uint8_t> Socket::readBuffer(const T& t) {
- std::vector<uint8_t> buf;
- buf.resize(sizeof t);
- readData((uint8_t*) &buf[0], sizeof(t));
- return buf;
+ std::vector<uint8_t> buf;
+ buf.resize(sizeof t);
+ readData((uint8_t*) &buf[0], sizeof(t));
+ return buf;
}
+int Socket::write(uint64_t base_value, bool is_little_endian) {
-int Socket::write(uint64_t base_value, bool is_little_endian){
-
- return Serializable::write(base_value,this,is_little_endian);
+ return Serializable::write(base_value, this, is_little_endian);
}
-
-int Socket::write(uint32_t base_value, bool is_little_endian){
- return Serializable::write(base_value,this,is_little_endian);
+int Socket::write(uint32_t base_value, bool is_little_endian) {
+ return Serializable::write(base_value, this, is_little_endian);
}
-int Socket::write(uint16_t base_value, bool is_little_endian){
- return Serializable::write(base_value,this,is_little_endian);
+int Socket::write(uint16_t base_value, bool is_little_endian) {
+ return Serializable::write(base_value, this, is_little_endian);
}
-
int Socket::read(uint64_t &value, bool is_little_endian) {
- auto buf = readBuffer(value);
-
- if (is_little_endian) {
- value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48)
- | ((uint64_t) (buf[2] & 255) << 40)
- | ((uint64_t) (buf[3] & 255) << 32)
- | ((uint64_t) (buf[4] & 255) << 24)
- | ((uint64_t) (buf[5] & 255) << 16)
- | ((uint64_t) (buf[6] & 255) << 8)
- | ((uint64_t) (buf[7] & 255) << 0);
- } else {
- value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8)
- | ((uint64_t) (buf[2] & 255) << 16)
- | ((uint64_t) (buf[3] & 255) << 24)
- | ((uint64_t) (buf[4] & 255) << 32)
- | ((uint64_t) (buf[5] & 255) << 40)
- | ((uint64_t) (buf[6] & 255) << 48)
- | ((uint64_t) (buf[7] & 255) << 56);
- }
- return sizeof(value);
+ auto buf = readBuffer(value);
+
+ if (is_little_endian) {
+ value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48)
+ | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32)
+ | ((uint64_t) (buf[4] & 255) << 24) | ((uint64_t) (buf[5] & 255) << 16)
+ | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
+ } else {
+ value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8)
+ | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24)
+ | ((uint64_t) (buf[4] & 255) << 32) | ((uint64_t) (buf[5] & 255) << 40)
+ | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56);
+ }
+ return sizeof(value);
}
int Socket::read(uint32_t &value, bool is_little_endian) {
- auto buf = readBuffer(value);
+ auto buf = readBuffer(value);
- if (is_little_endian) {
- value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
- } else {
- value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
+ if (is_little_endian) {
+ value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
+ } else {
+ value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
- }
+ }
- return sizeof(value);
+ return sizeof(value);
}
int Socket::read(uint16_t &value, bool is_little_endian) {
- auto buf = readBuffer(value);
+ auto buf = readBuffer(value);
- if (is_little_endian) {
- value = (buf[0] << 8) | buf[1];
- } else {
- value = buf[0] | buf[1] << 8;
+ if (is_little_endian) {
+ value = (buf[0] << 8) | buf[1];
+ } else {
+ value = buf[0] | buf[1] << 8;
- }
- return sizeof(value);
+ }
+ return sizeof(value);
}
int Socket::readData(std::vector<uint8_t> &buf, int buflen) {
- if (buf.capacity() < buflen)
- {
- buf.resize(buflen);
- }
- return readData((uint8_t*) &buf[0], buflen);
+ if (buf.capacity() < buflen) {
+ buf.resize(buflen);
+ }
+ return readData((uint8_t*) &buf[0], buflen);
}
int Socket::readData(uint8_t *buf, int buflen) {
- int total_read = 0;
- while (buflen) {
- short fd = select_descriptor(1000);
- if (fd < 0) {
-
- logger_->log_info("fd close %i",buflen);
- close(socket_file_descriptor_);
- return -1;
- }
-
- int bytes_read = recv(fd, buf, buflen, 0);
- if (bytes_read <= 0) {
- if (bytes_read == 0)
- logger_->log_info("Other side hung up on %d", fd);
- else {
- logger_->log_error("Could not recv on %d, error: %s",fd, strerror(errno));
- }
- return -1;
- }
-
-
- buflen -= bytes_read;
- buf += bytes_read;
- total_read += bytes_read;
- }
-
- return total_read;
+ int total_read = 0;
+ while (buflen) {
+ short fd = select_descriptor(1000);
+ if (fd < 0) {
+
+ logger_->log_info("fd close %i", buflen);
+ close(socket_file_descriptor_);
+ return -1;
+ }
+
+ int bytes_read = recv(fd, buf, buflen, 0);
+ if (bytes_read <= 0) {
+ if (bytes_read == 0)
+ logger_->log_info("Other side hung up on %d", fd);
+ else {
+ logger_->log_error("Could not recv on %d, error: %s", fd,
+ strerror(errno));
+ }
+ return -1;
+ }
+
+ buflen -= bytes_read;
+ buf += bytes_read;
+ total_read += bytes_read;
+ }
+
+ return total_read;
}
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/DataStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/DataStream.cpp b/libminifi/src/io/DataStream.cpp
index 3ff0a57..7a10bd9 100644
--- a/libminifi/src/io/DataStream.cpp
+++ b/libminifi/src/io/DataStream.cpp
@@ -27,6 +27,11 @@
#include <algorithm>
#include "io/DataStream.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
int DataStream::writeData(uint8_t *value, int size) {
@@ -126,3 +131,10 @@ int DataStream::readData(uint8_t *buf,int buflen) {
readBuffer += buflen;
return buflen;
}
+
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/EndianCheck.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/EndianCheck.cpp b/libminifi/src/io/EndianCheck.cpp
index f1bc98b..1b5020d 100644
--- a/libminifi/src/io/EndianCheck.cpp
+++ b/libminifi/src/io/EndianCheck.cpp
@@ -18,5 +18,16 @@
#include "io/EndianCheck.h"
-
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
bool EndiannessCheck::IS_LITTLE = EndiannessCheck::is_little_endian();
+
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/Serializable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp
index 78dc63c..f8c623a 100644
--- a/libminifi/src/io/Serializable.cpp
+++ b/libminifi/src/io/Serializable.cpp
@@ -22,7 +22,11 @@
#include <arpa/inet.h>
#include "io/DataStream.h"
#include "io/Serializable.h"
-
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
#define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32))
@@ -216,3 +220,10 @@ int Serializable::writeUTF(std::string str,DataStream *stream, bool widen) {
}
return ret;
}
+
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/SocketFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/SocketFactory.cpp b/libminifi/src/io/SocketFactory.cpp
deleted file mode 100644
index cbfdf96..0000000
--- a/libminifi/src/io/SocketFactory.cpp
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "io/SocketFactory.h"
-
-#include <atomic>
-#include <mutex>
-
-std::atomic<SocketFactory*> SocketFactory::context_instance_;
-std::mutex SocketFactory::context_mutex_;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/StreamFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/StreamFactory.cpp b/libminifi/src/io/StreamFactory.cpp
new file mode 100644
index 0000000..e3aa290
--- /dev/null
+++ b/libminifi/src/io/StreamFactory.cpp
@@ -0,0 +1,37 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../../include/io/StreamFactory.h"
+
+#include <atomic>
+#include <mutex>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+std::atomic<StreamFactory*> StreamFactory::context_instance_;
+std::mutex StreamFactory::context_mutex_;
+
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/TLSSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/TLSSocket.cpp b/libminifi/src/io/TLSSocket.cpp
deleted file mode 100644
index 1c81f6c..0000000
--- a/libminifi/src/io/TLSSocket.cpp
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifdef OPENSSL_SUPPORT
-#include "Property.h"
-#include "Configure.h"
-#include "io/TLSSocket.h"
-#include "utils/StringUtils.h"
-#include <openssl/ssl.h>
-#include <openssl/err.h>
-
-
-std::atomic<TLSContext*> TLSContext::context_instance;
-std::mutex TLSContext::context_mutex;
-
-TLSContext::TLSContext() :
- error_value(0), ctx(0), logger_(Logger::getLogger()), configuration(
- Configure::getConfigure()) {
-
-}
-
-/**
- * The memory barrier is defined by the singleton
- */
-short TLSContext::initialize() {
- if (ctx != 0) {
- return error_value;
- }
- std::string clientAuthStr;
- bool needClientCert = true;
- if (!(configuration->get(Configure::nifi_security_need_ClientAuth,
- clientAuthStr)
- && StringUtils::StringToBool(clientAuthStr, needClientCert))) {
- needClientCert = true;
- }
-
- SSL_library_init();
- const SSL_METHOD *method;
-
- OpenSSL_add_all_algorithms();
- SSL_load_error_strings();
- method = TLSv1_2_client_method();
- ctx = SSL_CTX_new(method);
- if (ctx == NULL) {
- logger_->log_error("Could not create SSL context, error: %s.",
- std::strerror(errno));
- error_value = TLS_ERROR_CONTEXT;
- return error_value;
- }
- if (needClientCert) {
- std::string certificate;
- std::string privatekey;
- std::string passphrase;
- std::string caCertificate;
-
- if (!(configuration->get(Configure::nifi_security_client_certificate,
- certificate)
- && configuration->get(
- Configure::nifi_security_client_private_key, privatekey))) {
- logger_->log_error(
- "Certificate and Private Key PEM file not configured, error: %s.",
- std::strerror(errno));
- error_value = TLS_ERROR_PEM_MISSING;
- return error_value;
- }
- // load certificates and private key in PEM format
- if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(),
- SSL_FILETYPE_PEM) <= 0) {
- logger_->log_error("Could not create load certificate, error : %s",
- std::strerror(errno));
- error_value = TLS_ERROR_CERT_MISSING;
- return error_value;
-
- }
- if (configuration->get(Configure::nifi_security_client_pass_phrase,
- passphrase)) {
- // if the private key has passphase
- SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
- }
-
-
- int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(),
- SSL_FILETYPE_PEM);
- if (retp != 1) {
- logger_->log_error("Could not create load private key,%i on %s error : %s",
- retp,privatekey.c_str(),std::strerror(errno));
- error_value = TLS_ERROR_KEY_ERROR;
- return error_value;
- }
- // verify private key
- if (!SSL_CTX_check_private_key(ctx)) {
- logger_->log_error(
- "Private key does not match the public certificate, error : %s",
- std::strerror(errno));
- error_value = TLS_ERROR_KEY_ERROR;
- return error_value;
- }
- // load CA certificates
- if (configuration->get(Configure::nifi_security_client_ca_certificate,
- caCertificate)) {
- retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0);
- if (retp==0) {
- logger_->log_error(
- "Can not load CA certificate, Exiting, error : %s",
- std::strerror(errno));
- error_value = TLS_ERROR_CERT_ERROR;
- return error_value;
- }
- }
-
- logger_->log_info("Load/Verify Client Certificate OK.");
- }
- return 0;
-}
-
-TLSSocket::~TLSSocket()
-{
- if (ssl != 0)
- SSL_free(ssl);
-}
-/**
- * Constructor that accepts host name, port and listeners. With this
- * contructor we will be creating a server socket
- * @param hostname our host name
- * @param port connecting port
- * @param listeners number of listeners in the queue
- */
-TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port,
- const uint16_t listeners) :
- ::Socket(hostname, port, listeners), ssl(0) {
-}
-
-TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port) :
- ::Socket(hostname, port, 0), ssl(0) {
-}
-
-TLSSocket::TLSSocket(const TLSSocket &&d) :
- ::Socket(std::move(d)), ssl(0) {
-}
-
-short TLSSocket::initialize() {
- TLSContext *context = TLSContext::getInstance();
- short ret = context->initialize();
- Socket::initialize();
- if (!ret) {
- // we have s2s secure config
- ssl = SSL_new(context->getContext());
- SSL_set_fd(ssl, socket_file_descriptor_);
- if (SSL_connect(ssl) == -1) {
- logger_->log_error("SSL socket connect failed to %s %d",
- requested_hostname_.c_str(), port_);
- SSL_free(ssl);
- ssl = NULL;
- close(socket_file_descriptor_);
- return -1;
- } else {
- logger_->log_info("SSL socket connect success to %s %d",
- requested_hostname_.c_str(), port_);
- return 0;
- }
- }
- return ret;
-}
-
-short TLSSocket::select_descriptor(const uint16_t msec) {
- if (ssl && SSL_pending(ssl))
- return 1;
- return Socket::select_descriptor(msec);
-}
-
-int TLSSocket::writeData(std::vector< uint8_t>& buf, int buflen)
-{
- return Socket::writeData(buf,buflen);
-}
-
-int TLSSocket::writeData(uint8_t *value, int size) {
- if (IsNullOrEmpty(ssl))
- return -1;
- // for SSL, wait for the TLS IO is completed
- int bytes = 0;
- int sent = 0;
- while (bytes < size) {
-
- sent = SSL_write(ssl, value + bytes, size - bytes);
- //check for errors
- if (sent < 0) {
- logger_->log_error("Site2Site Peer socket %d send failed %s",
- socket_file_descriptor_, strerror(errno));
- return sent;
- }
- bytes += sent;
- }
- return size;
-}
-
-int TLSSocket::readData(uint8_t *buf, int buflen) {
-
- if (IsNullOrEmpty(ssl))
- return -1;
- int total_read = 0;
- int status = 0;
- while (buflen) {
- short fd = select_descriptor(1000);
- if (fd <= 0) {
-
- close(socket_file_descriptor_);
- return -1;
- }
-
- int sslStatus;
- do {
- status = SSL_read(ssl, buf, buflen);
- sslStatus = SSL_get_error(ssl, status);
- } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
-
- buflen -= status;
- buf += status;
- total_read += status;
- }
-
- return total_read;
-}
-#endif // #ifdef OPENSSL_SUPPORT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/tls/TLSSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp
new file mode 100644
index 0000000..b2df394
--- /dev/null
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -0,0 +1,249 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "properties/Configure.h"
+#include "io/tls/TLSSocket.h"
+#include "utils/StringUtils.h"
+
+#include "core/Property.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+
+std::atomic<TLSContext*> TLSContext::context_instance;
+std::mutex TLSContext::context_mutex;
+
+TLSContext::TLSContext() :
+ error_value(0), ctx(0), logger_(logging::Logger::getLogger()), configuration(
+ Configure::getConfigure()) {
+
+}
+
+/**
+ * The memory barrier is defined by the singleton
+ */
+short TLSContext::initialize() {
+ if (ctx != 0) {
+ return error_value;
+ }
+ std::string clientAuthStr;
+ bool needClientCert = true;
+ if (!(configuration->get(Configure::nifi_security_need_ClientAuth,
+ clientAuthStr)
+ && org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, needClientCert))) {
+ needClientCert = true;
+ }
+
+ SSL_library_init();
+ const SSL_METHOD *method;
+
+ OpenSSL_add_all_algorithms();
+ SSL_load_error_strings();
+ method = TLSv1_2_client_method();
+ ctx = SSL_CTX_new(method);
+ if (ctx == NULL) {
+ logger_->log_error("Could not create SSL context, error: %s.",
+ std::strerror(errno));
+ error_value = TLS_ERROR_CONTEXT;
+ return error_value;
+ }
+ if (needClientCert) {
+ std::string certificate;
+ std::string privatekey;
+ std::string passphrase;
+ std::string caCertificate;
+
+ if (!(configuration->get(Configure::nifi_security_client_certificate,
+ certificate)
+ && configuration->get(
+ Configure::nifi_security_client_private_key, privatekey))) {
+ logger_->log_error(
+ "Certificate and Private Key PEM file not configured, error: %s.",
+ std::strerror(errno));
+ error_value = TLS_ERROR_PEM_MISSING;
+ return error_value;
+ }
+ // load certificates and private key in PEM format
+ if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(),
+ SSL_FILETYPE_PEM) <= 0) {
+ logger_->log_error("Could not create load certificate, error : %s",
+ std::strerror(errno));
+ error_value = TLS_ERROR_CERT_MISSING;
+ return error_value;
+
+ }
+ if (configuration->get(Configure::nifi_security_client_pass_phrase,
+ passphrase)) {
+ // if the private key has passphase
+ SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
+ }
+
+
+ int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(),
+ SSL_FILETYPE_PEM);
+ if (retp != 1) {
+ logger_->log_error("Could not create load private key,%i on %s error : %s",
+ retp,privatekey.c_str(),std::strerror(errno));
+ error_value = TLS_ERROR_KEY_ERROR;
+ return error_value;
+ }
+ // verify private key
+ if (!SSL_CTX_check_private_key(ctx)) {
+ logger_->log_error(
+ "Private key does not match the public certificate, error : %s",
+ std::strerror(errno));
+ error_value = TLS_ERROR_KEY_ERROR;
+ return error_value;
+ }
+ // load CA certificates
+ if (configuration->get(Configure::nifi_security_client_ca_certificate,
+ caCertificate)) {
+ retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0);
+ if (retp==0) {
+ logger_->log_error(
+ "Can not load CA certificate, Exiting, error : %s",
+ std::strerror(errno));
+ error_value = TLS_ERROR_CERT_ERROR;
+ return error_value;
+ }
+ }
+
+ logger_->log_info("Load/Verify Client Certificate OK.");
+ }
+ return 0;
+}
+
+TLSSocket::~TLSSocket()
+{
+ if (ssl != 0)
+ SSL_free(ssl);
+}
+/**
+ * Constructor that accepts host name, port and listeners. With this
+ * contructor we will be creating a server socket
+ * @param hostname our host name
+ * @param port connecting port
+ * @param listeners number of listeners in the queue
+ */
+TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port,
+ const uint16_t listeners) :
+ Socket(hostname, port, listeners), ssl(0) {
+}
+
+TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port) :
+ Socket(hostname, port, 0), ssl(0) {
+}
+
+TLSSocket::TLSSocket(const TLSSocket &&d) :
+ Socket(std::move(d)), ssl(0) {
+}
+
+short TLSSocket::initialize() {
+ TLSContext *context = TLSContext::getInstance();
+ short ret = context->initialize();
+ Socket::initialize();
+ if (!ret) {
+ // we have s2s secure config
+ ssl = SSL_new(context->getContext());
+ SSL_set_fd(ssl, socket_file_descriptor_);
+ if (SSL_connect(ssl) == -1) {
+ logger_->log_error("SSL socket connect failed to %s %d",
+ requested_hostname_.c_str(), port_);
+ SSL_free(ssl);
+ ssl = NULL;
+ close(socket_file_descriptor_);
+ return -1;
+ } else {
+ logger_->log_info("SSL socket connect success to %s %d",
+ requested_hostname_.c_str(), port_);
+ return 0;
+ }
+ }
+ return ret;
+}
+
+short TLSSocket::select_descriptor(const uint16_t msec) {
+ if (ssl && SSL_pending(ssl))
+ return 1;
+ return Socket::select_descriptor(msec);
+}
+
+int TLSSocket::writeData(std::vector< uint8_t>& buf, int buflen)
+{
+ return Socket::writeData(buf,buflen);
+}
+
+int TLSSocket::writeData(uint8_t *value, int size) {
+ if (IsNullOrEmpty(ssl))
+ return -1;
+ // for SSL, wait for the TLS IO is completed
+ int bytes = 0;
+ int sent = 0;
+ while (bytes < size) {
+
+ sent = SSL_write(ssl, value + bytes, size - bytes);
+ //check for errors
+ if (sent < 0) {
+ logger_->log_error("Site2Site Peer socket %d send failed %s",
+ socket_file_descriptor_, strerror(errno));
+ return sent;
+ }
+ bytes += sent;
+ }
+ return size;
+}
+
+int TLSSocket::readData(uint8_t *buf, int buflen) {
+
+ if (IsNullOrEmpty(ssl))
+ return -1;
+ int total_read = 0;
+ int status = 0;
+ while (buflen) {
+ short fd = select_descriptor(1000);
+ if (fd <= 0) {
+
+ close(socket_file_descriptor_);
+ return -1;
+ }
+
+ int sslStatus;
+ do {
+ status = SSL_read(ssl, buf, buflen);
+ sslStatus = SSL_get_error(ssl, status);
+ } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
+
+ buflen -= status;
+ buf += status;
+ total_read += status;
+ }
+
+ return total_read;
+}
+
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/AppendHostInfo.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/AppendHostInfo.cpp b/libminifi/src/processors/AppendHostInfo.cpp
new file mode 100644
index 0000000..24ccc9a
--- /dev/null
+++ b/libminifi/src/processors/AppendHostInfo.cpp
@@ -0,0 +1,124 @@
+/**
+ * @file AppendHostInfo.cpp
+ * AppendHostInfo class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <set>
+#include <sys/time.h>
+#include <string.h>
+#include "processors/AppendHostInfo.h"
+#include "core/ProcessContext.h"
+#include "core/Property.h"
+#include "core/ProcessSession.h"
+
+#include <netdb.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <net/if.h>
+#include <arpa/inet.h>
+
+#include "../../include/core/FlowFile.h"
+#include "io/ClientSocket.h"
+
+#define __USE_POSIX
+#include <limits.h>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#ifndef HOST_NAME_MAX
+#define HOST_NAME_MAX 255
+#endif
+
+const std::string AppendHostInfo::ProcessorName("AppendHostInfo");
+core::Property AppendHostInfo::InterfaceName(
+ "Network Interface Name",
+ "Network interface from which to read an IP v4 address", "eth0");
+core::Property AppendHostInfo::HostAttribute(
+ "Hostname Attribute",
+ "Flowfile attribute to used to record the agent's hostname",
+ "source.hostname");
+core::Property AppendHostInfo::IPAttribute(
+ "IP Attribute",
+ "Flowfile attribute to used to record the agent's IP address",
+ "source.ipv4");
+core::Relationship AppendHostInfo::Success(
+ "success", "success operational on the flow record");
+
+void AppendHostInfo::initialize() {
+ // Set the supported properties
+ std::set<core::Property> properties;
+ properties.insert(InterfaceName);
+ properties.insert(HostAttribute);
+ properties.insert(IPAttribute);
+ setSupportedProperties(properties);
+
+ // Set the supported relationships
+ std::set<core::Relationship> relationships;
+ relationships.insert(Success);
+ setSupportedRelationships(relationships);
+}
+
+void AppendHostInfo::onTrigger(
+ core::ProcessContext *context,
+ core::ProcessSession *session) {
+ std::shared_ptr<core::FlowFile> flow =
+ session->get();
+ if (!flow)
+ return;
+
+ //Get Hostname
+
+ std::string hostAttribute = "";
+ context->getProperty(HostAttribute.getName(), hostAttribute);
+ flow->addAttribute(hostAttribute.c_str(),
+ org::apache::nifi::minifi::io::Socket::getMyHostName());
+
+ //Get IP address for the specified interface
+ std::string iface;
+ context->getProperty(InterfaceName.getName(), iface);
+ //Confirm the specified interface name exists on this device
+ if (if_nametoindex(iface.c_str()) != 0) {
+ struct ifreq ifr;
+ int fd = socket(AF_INET, SOCK_DGRAM, 0);
+ //Type of address to retrieve - IPv4 IP address
+ ifr.ifr_addr.sa_family = AF_INET;
+ //Copy the interface name in the ifreq structure
+ strncpy(ifr.ifr_name, iface.c_str(), IFNAMSIZ - 1);
+ ioctl(fd, SIOCGIFADDR, &ifr);
+ close(fd);
+
+ std::string ipAttribute;
+ context->getProperty(IPAttribute.getName(), ipAttribute);
+ flow->addAttribute(
+ ipAttribute.c_str(),
+ inet_ntoa(((struct sockaddr_in *) &ifr.ifr_addr)->sin_addr));
+ }
+
+ // Transfer to the relationship
+ session->transfer(flow, Success);
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */