You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/04/05 16:11:46 UTC
[2/4] nifi-minifi-cpp git commit: MINIFI-254: Incremental update for
linter changes
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/Record.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Record.cpp b/libminifi/src/core/Record.cpp
deleted file mode 100644
index 6f33300..0000000
--- a/libminifi/src/core/Record.cpp
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Copyright 2017 <copyright holder> <email>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/FlowFile.h"
-#include "core/logging/Logger.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-
-FlowFile::FlowFile()
- : size_(0),
- id_(0),
- stored(false),
- offset_(0),
- last_queue_date_(0),
- penaltyExpiration_ms_(0),
- claim_(nullptr),
- marked_delete_(false),
- connection_(nullptr),
- original_connection_() {
- entry_date_ = getTimeMillis();
- lineage_start_date_ = entry_date_;
-
- char uuidStr[37];
-
- // Generate the global UUID for the flow record
- uuid_generate(uuid_);
-
- uuid_unparse_lower(uuid_, uuidStr);
- uuid_str_ = uuidStr;
-
- logger_ = logging::Logger::getLogger();
-
-}
-
-FlowFile::~FlowFile() {
-
-}
-
-FlowFile& FlowFile::operator=(const FlowFile& other) {
-
- uuid_copy(uuid_, other.uuid_);
- stored = other.stored;
- marked_delete_ = other.marked_delete_;
- entry_date_ = other.entry_date_;
- lineage_start_date_ = other.lineage_start_date_;
- lineage_Identifiers_ = other.lineage_Identifiers_;
- last_queue_date_ = other.last_queue_date_;
- size_ = other.size_;
- penaltyExpiration_ms_ = other.penaltyExpiration_ms_;
- attributes_ = other.attributes_;
- claim_ = other.claim_;
- if (claim_ != nullptr)
- this->claim_->increaseFlowFileRecordOwnedCount();
- uuid_str_ = other.uuid_str_;
- connection_ = other.connection_;
- original_connection_ = other.original_connection_;
-
- return *this;
-}
-
-/**
- * Returns whether or not this flow file record
- * is marked as deleted.
- * @return marked deleted
- */
-bool FlowFile::isDeleted() {
- return marked_delete_;
-}
-
-/**
- * Sets whether to mark this flow file record
- * as deleted
- * @param deleted deleted flag
- */
-void FlowFile::setDeleted(const bool deleted) {
- marked_delete_ = deleted;
-}
-
-std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() {
- return claim_;
-}
-
-void FlowFile::clearResourceClaim() {
- claim_ = nullptr;
-}
-void FlowFile::setResourceClaim(std::shared_ptr<ResourceClaim> &claim) {
- claim_ = claim;
-}
-
-// ! Get Entry Date
-uint64_t FlowFile::getEntryDate() {
- return entry_date_;
-}
-uint64_t FlowFile::getEventTime() {
- return event_time_;
-}
-// ! Get Lineage Start Date
-uint64_t FlowFile::getlineageStartDate() {
- return lineage_start_date_;
-}
-
-std::set<std::string> &FlowFile::getlineageIdentifiers() {
- return lineage_Identifiers_;
-}
-
-bool FlowFile::getAttribute(std::string key, std::string &value) {
- auto it = attributes_.find(key);
- if (it != attributes_.end()) {
- value = it->second;
- return true;
- } else {
- return false;
- }
-}
-
-// Get Size
-uint64_t FlowFile::getSize() {
- return size_;
-}
-// ! Get Offset
-uint64_t FlowFile::getOffset() {
- return offset_;
-}
-
-bool FlowFile::removeAttribute(const std::string key) {
- auto it = attributes_.find(key);
- if (it != attributes_.end()) {
- attributes_.erase(key);
- return true;
- } else {
- return false;
- }
-}
-
-bool FlowFile::updateAttribute(const std::string key, const std::string value) {
- auto it = attributes_.find(key);
- if (it != attributes_.end()) {
- attributes_[key] = value;
- return true;
- } else {
- return false;
- }
-}
-
-bool FlowFile::addAttribute(const std::string &key, const std::string &value) {
-
-
- auto it = attributes_.find(key);
- if (it != attributes_.end()) {
- // attribute already there in the map
- return false;
- } else {
- attributes_[key] = value;
- return true;
- }
-}
-
-void FlowFile::setLineageStartDate(const uint64_t date) {
- lineage_start_date_ = date;
-}
-
-/**
- * Sets the original connection with a shared pointer.
- * @param connection shared connection.
- */
-void FlowFile::setOriginalConnection(
- std::shared_ptr<core::Connectable> &connection) {
- original_connection_ = connection;
-}
-
-/**
- * Sets the connection with a shared pointer.
- * @param connection shared connection.
- */
-void FlowFile::setConnection(std::shared_ptr<core::Connectable> &connection) {
- connection_ = connection;
-}
-
-/**
- * Sets the connection with a shared pointer.
- * @param connection shared connection.
- */
-void FlowFile::setConnection(std::shared_ptr<core::Connectable> &&connection) {
- connection_ = connection;
-}
-
-/**
- * Returns the connection referenced by this record.
- * @return shared connection pointer.
- */
-std::shared_ptr<core::Connectable> FlowFile::getConnection() {
- return connection_;
-}
-
-/**
- * Returns the original connection referenced by this record.
- * @return shared original connection pointer.
- */
-std::shared_ptr<core::Connectable> FlowFile::getOriginalConnection() {
- return original_connection_;
-}
-
-}
-}
-}
-}
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/Repository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp
index 9a27785..50e8cd2 100644
--- a/libminifi/src/core/Repository.cpp
+++ b/libminifi/src/core/Repository.cpp
@@ -15,15 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#include "core/Repository.h"
+#include <arpa/inet.h>
#include <cstdint>
#include <vector>
-#include <arpa/inet.h>
#include "io/DataStream.h"
#include "io/Serializable.h"
#include "core/Relationship.h"
#include "core/logging/Logger.h"
#include "FlowController.h"
-#include "core/Repository.h"
#include "provenance/Provenance.h"
#include "core/repository/FlowFileRepository.h"
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/RepositoryFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp
index 9bdc7c3..c24a2af 100644
--- a/libminifi/src/core/RepositoryFactory.cpp
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -1,6 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/RepositoryFactory.h"
+#include <memory>
+#include <string>
+#include <algorithm>
#include "core/Repository.h"
-#include "core/Repository.h"
-
#ifdef LEVELDB_SUPPORT
#include "core/repository/FlowFileRepository.h"
#include "provenance/ProvenanceRepository.h"
@@ -11,57 +29,51 @@ namespace apache {
namespace nifi {
namespace minifi {
#ifndef LEVELDB_SUPPORT
- namespace provenance{
- class ProvenanceRepository;
- }
+namespace provenance {
+class ProvenanceRepository;
+}
#endif
namespace core {
#ifndef LEVELDB_SUPPORT
- class FlowFileRepository;
+class FlowFileRepository;
#endif
-
- std::shared_ptr<core::Repository> createRepository(
- const std::string configuration_class_name, bool fail_safe = false) {
-
- std::string class_name_lc = configuration_class_name;
- std::transform(class_name_lc.begin(), class_name_lc.end(),
- class_name_lc.begin(), ::tolower);
- try {
- std::shared_ptr<core::Repository> return_obj = nullptr;
- if (class_name_lc == "flowfilerepository") {
-
- return_obj = instantiate<core::repository::FlowFileRepository>();
- } else if (class_name_lc == "provenancerepository") {
-
-
- return_obj = instantiate<provenance::ProvenanceRepository>();//std::shared_ptr<core::Repository>((core::Repository*)instantiate<provenance::ProvenanceRepository>());
-
- }
-
- if (return_obj){
- return return_obj;
- }
- if (fail_safe) {
- return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1,
- 1, 1);
- } else {
- throw std::runtime_error(
- "Support for the provided configuration class could not be found");
- }
- } catch (const std::runtime_error &r) {
- if (fail_safe) {
- return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1,
- 1, 1);
- }
+std::shared_ptr<core::Repository> createRepository(
+ const std::string configuration_class_name, bool fail_safe) {
+ std::shared_ptr<core::Repository> return_obj = nullptr;
+ std::string class_name_lc = configuration_class_name;
+ std::transform(class_name_lc.begin(), class_name_lc.end(),
+ class_name_lc.begin(), ::tolower);
+ try {
+ std::shared_ptr<core::Repository> return_obj = nullptr;
+ if (class_name_lc == "flowfilerepository") {
+ return_obj = instantiate<core::repository::FlowFileRepository>();
+ } else if (class_name_lc == "provenancerepository") {
+ return_obj = instantiate<provenance::ProvenanceRepository>();
}
- throw std::runtime_error(
- "Support for the provided configuration class could not be found");
+ if (return_obj) {
+ return return_obj;
+ }
+ if (fail_safe) {
+ return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1,
+ 1);
+ } else {
+ throw std::runtime_error(
+ "Support for the provided configuration class could not be found");
+ }
+ } catch (const std::runtime_error &r) {
+ if (fail_safe) {
+ return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1,
+ 1);
+ }
}
-
+ throw std::runtime_error(
+ "Support for the provided configuration class could not be found");
+}
+
} /* namespace core */
} /* namespace minifi */
} /* namespace nifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/logging/BaseLogger.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/BaseLogger.cpp b/libminifi/src/core/logging/BaseLogger.cpp
index a6b43a8..d4774df 100644
--- a/libminifi/src/core/logging/BaseLogger.cpp
+++ b/libminifi/src/core/logging/BaseLogger.cpp
@@ -17,6 +17,10 @@
*/
#include "core/logging/BaseLogger.h"
+#include <utility>
+#include <memory>
+#include <algorithm>
+#include <string>
namespace org {
namespace apache {
@@ -68,7 +72,6 @@ void BaseLogger::log_info(const char * const format, ...) {
* @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
*/
void BaseLogger::log_debug(const char * const format, ...) {
-
if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::debug))
return;
FILL_BUFFER
@@ -80,7 +83,6 @@ void BaseLogger::log_debug(const char * const format, ...) {
* @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
*/
void BaseLogger::log_trace(const char * const format, ...) {
-
if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::trace))
return;
FILL_BUFFER
@@ -122,7 +124,6 @@ void BaseLogger::log_str(LOG_LEVEL_E level, const std::string &buffer) {
logger_->info(buffer);
break;
}
-
}
void BaseLogger::setLogLevel(const std::string &level,
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/logging/LogAppenders.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/LogAppenders.cpp b/libminifi/src/core/logging/LogAppenders.cpp
index 5d92334..1918a0d 100644
--- a/libminifi/src/core/logging/LogAppenders.cpp
+++ b/libminifi/src/core/logging/LogAppenders.cpp
@@ -24,12 +24,15 @@ namespace minifi {
namespace core {
namespace logging {
-const char *OutputStreamAppender::nifi_log_output_stream_error_stderr="nifi.log.outputstream.appender.error.stderr";
-
-const char *RollingAppender::nifi_log_rolling_apender_file = "nifi.log.rolling.appender.file";
-const char *RollingAppender::nifi_log_rolling_appender_max_files = "nifi.log.rolling.appender.max.files";
-const char *RollingAppender::nifi_log_rolling_appender_max_file_size = "nifi.log.rolling.appender.max.file_size";
+const char *OutputStreamAppender::nifi_log_output_stream_error_stderr =
+ "nifi.log.outputstream.appender.error.stderr";
+const char *RollingAppender::nifi_log_rolling_apender_file =
+ "nifi.log.rolling.appender.file";
+const char *RollingAppender::nifi_log_rolling_appender_max_files =
+ "nifi.log.rolling.appender.max.files";
+const char *RollingAppender::nifi_log_rolling_appender_max_file_size =
+ "nifi.log.rolling.appender.max.file_size";
} /* namespace logging */
} /* namespace core */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/logging/Logger.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/Logger.cpp b/libminifi/src/core/logging/Logger.cpp
index d8cadf3..5c08fa8 100644
--- a/libminifi/src/core/logging/Logger.cpp
+++ b/libminifi/src/core/logging/Logger.cpp
@@ -21,6 +21,7 @@
#include <vector>
#include <queue>
+#include <memory>
#include <map>
namespace org {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/repository/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp
index 8f13f39..5f62f83 100644
--- a/libminifi/src/core/repository/FlowFileRepository.cpp
+++ b/libminifi/src/core/repository/FlowFileRepository.cpp
@@ -1,4 +1,24 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
#include "core/repository/FlowFileRepository.h"
+#include <memory>
+#include <string>
+#include <vector>
#include "FlowFileRecord.h"
namespace org {
@@ -20,10 +40,12 @@ void FlowFileRepository::run() {
leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
- std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this());
+ std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<
+ FlowFileRecord>(shared_from_this());
std::string key = it->key().ToString();
- if (eventRead->DeSerialize((uint8_t *) it->value().data(),
- (int) it->value().size())) {
+ if (eventRead->DeSerialize(
+ reinterpret_cast<const uint8_t *>(it->value().data()),
+ it->value().size())) {
if ((curTime - eventRead->getEventTime()) > max_partition_millis_)
purgeList.push_back(key);
} else {
@@ -47,54 +69,44 @@ void FlowFileRepository::run() {
return;
}
-void FlowFileRepository::loadComponent()
- {
-
+void FlowFileRepository::loadComponent() {
std::vector<std::string> purgeList;
- leveldb::Iterator* it = db_->NewIterator(
- leveldb::ReadOptions());
+ leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
- for (it->SeekToFirst(); it->Valid(); it->Next())
- {
- std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this());
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ std::shared_ptr<FlowFileRecord> eventRead =
+ std::make_shared<FlowFileRecord>(shared_from_this());
std::string key = it->key().ToString();
- if (eventRead->DeSerialize((uint8_t *) it->value().data(),
- (int) it->value().size()))
- {
+ if (eventRead->DeSerialize(
+ reinterpret_cast<const uint8_t *>(it->value().data()),
+ it->value().size())) {
auto search = connectionMap.find(eventRead->getConnectionUuid());
- if (search != connectionMap.end())
- {
+ if (search != connectionMap.end()) {
// we find the connection for the persistent flowfile, create the flowfile and enqueue that
- std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
- std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(),flow_file_ref);
+ std::shared_ptr<core::FlowFile> flow_file_ref =
+ std::static_pointer_cast<core::FlowFile>(eventRead);
+ std::shared_ptr<FlowFileRecord> record =
+ std::make_shared<FlowFileRecord>(shared_from_this(), flow_file_ref);
// set store to repo to true so that we do need to persistent again in enqueue
record->setStoredToRepository(true);
search->second->put(record);
- }
- else
- {
- if (eventRead->getContentFullPath().length() > 0)
- {
+ } else {
+ if (eventRead->getContentFullPath().length() > 0) {
std::remove(eventRead->getContentFullPath().c_str());
}
purgeList.push_back(key);
}
- }
- else
- {
+ } else {
purgeList.push_back(key);
}
}
delete it;
std::vector<std::string>::iterator itPurge;
- for (itPurge = purgeList.begin(); itPurge != purgeList.end();
- itPurge++)
- {
+ for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) {
std::string eventId = *itPurge;
- logger_->log_info("Repository Repo %s Purge %s",
- name_.c_str(),
- eventId.c_str());
+ logger_->log_info("Repository Repo %s Purge %s", name_.c_str(),
+ eventId.c_str());
Delete(eventId);
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 8e59363..4e736f8 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -17,7 +17,10 @@
*/
#include "core/yaml/YamlConfiguration.h"
-
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
namespace org {
namespace apache {
namespace nifi {
@@ -29,18 +32,17 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(
uuid_t uuid;
std::string flowName = rootFlowNode["name"].as<std::string>();
- std::string id ;
-
+ std::string id;
+
try {
rootFlowNode["id"].as<std::string>();
uuid_parse(id.c_str(), uuid);
- }catch(...)
- {
+ } catch (...) {
logger_->log_warn("Generating random ID for root node");
uuid_generate(uuid);
char uuid_str[37];
- uuid_unparse(uuid,uuid_str);
+ uuid_unparse(uuid, uuid_str);
id = uuid_str;
}
@@ -69,7 +71,6 @@ void YamlConfiguration::parseProcessorNodeYaml(
}
if (processorsNode) {
-
if (processorsNode.IsSequence()) {
// Evaluate sequence of processors
int numProcessors = processorsNode.size();
@@ -196,7 +197,6 @@ void YamlConfiguration::parseProcessorNodeYaml(
processor->setSchedulingStrategy(core::CRON_DRIVEN);
logger_->log_debug("setting scheduling strategy as %s",
procCfg.schedulingStrategy.c_str());
-
}
int64_t maxConcurrentTasks;
@@ -324,7 +324,6 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(
this->parsePortYaml(&currPort, group, RECEIVE);
} // for node
}
-
}
}
}
@@ -341,11 +340,9 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
}
if (connectionsNode) {
-
if (connectionsNode->IsSequence()) {
for (YAML::const_iterator iter = connectionsNode->begin();
iter != connectionsNode->end(); ++iter) {
-
YAML::Node connectionNode = iter->as<YAML::Node>();
std::string name = connectionNode["name"].as<std::string>();
@@ -461,7 +458,6 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]",
maxConcurrentTasks);
processor->setMaxConcurrentTasks(maxConcurrentTasks);
-
}
void YamlConfiguration::parsePropertiesNodeYaml(
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/BaseStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/BaseStream.cpp b/libminifi/src/io/BaseStream.cpp
index 1400a1d..8070c38 100644
--- a/libminifi/src/io/BaseStream.cpp
+++ b/libminifi/src/io/BaseStream.cpp
@@ -16,9 +16,9 @@
* limitations under the License.
*/
#include "io/BaseStream.h"
+#include <string>
#include "io/Serializable.h"
-
namespace org {
namespace apache {
namespace nifi {
@@ -32,7 +32,8 @@ namespace io {
* @return resulting write size
**/
int BaseStream::write(uint32_t base_value, bool is_little_endian) {
- return Serializable::write(base_value, (DataStream*) this, is_little_endian);
+ return Serializable::write(base_value, reinterpret_cast<DataStream*>(this),
+ is_little_endian);
}
/**
@@ -43,7 +44,8 @@ int BaseStream::write(uint32_t base_value, bool is_little_endian) {
* @return resulting write size
**/
int BaseStream::write(uint16_t base_value, bool is_little_endian) {
- return Serializable::write(base_value, (DataStream*) this, is_little_endian);
+ return Serializable::write(base_value, reinterpret_cast<DataStream*>(this),
+ is_little_endian);
}
/**
@@ -54,7 +56,7 @@ int BaseStream::write(uint16_t base_value, bool is_little_endian) {
* @return resulting write size
**/
int BaseStream::write(uint8_t *value, int len) {
- return Serializable::write(value, len, (DataStream*) this);
+ return Serializable::write(value, len, reinterpret_cast<DataStream*>(this));
}
/**
@@ -65,7 +67,8 @@ int BaseStream::write(uint8_t *value, int len) {
* @return resulting write size
**/
int BaseStream::write(uint64_t base_value, bool is_little_endian) {
- return Serializable::write(base_value, (DataStream*) this, is_little_endian);
+ return Serializable::write(base_value, reinterpret_cast<DataStream*>(this),
+ is_little_endian);
}
/**
@@ -74,8 +77,8 @@ int BaseStream::write(uint64_t base_value, bool is_little_endian) {
* @return resulting write size
**/
int BaseStream::write(bool value) {
- uint8_t v = value;
- return Serializable::write(v);
+ uint8_t v = value;
+ return Serializable::write(v);
}
/**
@@ -84,7 +87,7 @@ int BaseStream::write(bool value) {
* @return resulting write size
**/
int BaseStream::writeUTF(std::string str, bool widen) {
- return Serializable::writeUTF(str, (DataStream*) this, widen);
+ return Serializable::writeUTF(str, reinterpret_cast<DataStream*>(this), widen);
}
/**
@@ -94,7 +97,7 @@ int BaseStream::writeUTF(std::string str, bool widen) {
* @return resulting read size
**/
int BaseStream::read(uint8_t &value) {
- return Serializable::read(value, (DataStream*) this);
+ return Serializable::read(value, reinterpret_cast<DataStream*>(this));
}
/**
@@ -104,7 +107,7 @@ int BaseStream::read(uint8_t &value) {
* @return resulting read size
**/
int BaseStream::read(uint16_t &base_value, bool is_little_endian) {
- return Serializable::read(base_value, (DataStream*) this);
+ return Serializable::read(base_value, reinterpret_cast<DataStream*>(this));
}
/**
@@ -114,7 +117,7 @@ int BaseStream::read(uint16_t &base_value, bool is_little_endian) {
* @return resulting read size
**/
int BaseStream::read(char &value) {
- return Serializable::read(value, (DataStream*) this);
+ return Serializable::read(value, reinterpret_cast<DataStream*>(this));
}
/**
@@ -125,7 +128,7 @@ int BaseStream::read(char &value) {
* @return resulting read size
**/
int BaseStream::read(uint8_t *value, int len) {
- return Serializable::read(value, len, (DataStream*) this);
+ return Serializable::read(value, len, reinterpret_cast<DataStream*>(this));
}
/**
@@ -135,7 +138,8 @@ int BaseStream::read(uint8_t *value, int len) {
* @return resulting read size
**/
int BaseStream::read(uint32_t &value, bool is_little_endian) {
- return Serializable::read(value, (DataStream*) this, is_little_endian);
+ return Serializable::read(value, reinterpret_cast<DataStream*>(this),
+ is_little_endian);
}
/**
@@ -145,7 +149,8 @@ int BaseStream::read(uint32_t &value, bool is_little_endian) {
* @return resulting read size
**/
int BaseStream::read(uint64_t &value, bool is_little_endian) {
- return Serializable::read(value, (DataStream*) this, is_little_endian);
+ return Serializable::read(value, reinterpret_cast<DataStream*>(this),
+ is_little_endian);
}
/**
@@ -155,10 +160,9 @@ int BaseStream::read(uint64_t &value, bool is_little_endian) {
* @return resulting read size
**/
int BaseStream::readUTF(std::string &str, bool widen) {
- return Serializable::readUTF(str, (DataStream*) this, widen);
+ return Serializable::readUTF(str, reinterpret_cast<DataStream*>(this), widen);
}
-
} /* namespace io */
} /* namespace minifi */
} /* namespace nifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/CRCStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/CRCStream.cpp b/libminifi/src/io/CRCStream.cpp
index 47b45b5..e06a8f5 100644
--- a/libminifi/src/io/CRCStream.cpp
+++ b/libminifi/src/io/CRCStream.cpp
@@ -20,5 +20,3 @@
#include <memory>
#include "io/CRCStream.h"
-
-
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index ad6b04d..e62d4f1 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -15,20 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#include "io/ClientSocket.h"
#include <netinet/tcp.h>
#include <sys/types.h>
+#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstdio>
+#include <utility>
+#include <vector>
#include <cerrno>
-#include <netdb.h>
#include <iostream>
#include <string>
-
#include "io/validation.h"
-#include "io/ClientSocket.h"
namespace org {
namespace apache {
@@ -36,7 +37,7 @@ namespace nifi {
namespace minifi {
namespace io {
-std::string Socket::HOSTNAME = Socket::getMyHostName(0);
+char *Socket::HOSTNAME = const_cast<char*>(Socket::getMyHostName(0).c_str());
Socket::Socket(const std::string &hostname, const uint16_t port,
const uint16_t listeners = -1)
@@ -50,12 +51,10 @@ Socket::Socket(const std::string &hostname, const uint16_t port,
logger_ = logging::Logger::getLogger();
FD_ZERO(&total_list_);
FD_ZERO(&read_fds_);
-
}
Socket::Socket(const std::string &hostname, const uint16_t port)
: Socket(hostname, port, 0) {
-
}
Socket::Socket(const Socket &&other)
@@ -69,7 +68,6 @@ Socket::Socket(const Socket &&other)
read_fds_(other.read_fds_),
canonical_hostname_(std::move(other.canonical_hostname_)) {
logger_ = logging::Logger::getLogger();
-
}
Socket::~Socket() {
@@ -81,7 +79,6 @@ void Socket::closeStream() {
freeaddrinfo(addr_info_);
addr_info_ = 0;
}
-
if (socket_file_descriptor_ >= 0) {
close(socket_file_descriptor_);
socket_file_descriptor_ = -1;
@@ -98,12 +95,10 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
setSocketOptions(socket_file_descriptor_);
if (listeners_ > 0) {
-
struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
sa_loc->sin_family = AF_INET;
sa_loc->sin_port = htons(port_);
sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
-
if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
logger_->log_error("Could not bind to socket", strerror(errno));
return -1;
@@ -113,7 +108,6 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
if (listeners_ <= 0) {
struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
sa_loc->sin_family = AF_INET;
- //sa_loc->sin_port = htons(port);
sa_loc->sin_port = htons(port_);
// use any address if you are connecting to the local machine for testing
// otherwise we must use the requested hostname
@@ -129,7 +123,6 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
logger_->log_warn("Could not connect to socket, error:%s",
strerror(errno));
return -1;
-
}
}
}
@@ -140,7 +133,6 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
logger_->log_warn("attempted connection, saw %s", strerror(errno));
return -1;
}
-
}
// add the listener to the total set
FD_SET(socket_file_descriptor_, &total_list_);
@@ -148,8 +140,7 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
return 0;
}
-short Socket::initialize() {
-
+int16_t Socket::initialize() {
struct sockaddr_in servAddr;
addrinfo hints = { sizeof(addrinfo) };
@@ -159,7 +150,6 @@ short Socket::initialize() {
hints.ai_flags = AI_CANONNAME;
if (listeners_ > 0)
hints.ai_flags |= AI_PASSIVE;
-
hints.ai_protocol = 0; /* any protocol */
int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints,
@@ -188,8 +178,7 @@ short Socket::initialize() {
int hh_errno;
gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
#endif
-
- memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+ memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length);
auto p = addr_info_;
for (; p != NULL; p = p->ai_next) {
@@ -197,8 +186,7 @@ short Socket::initialize() {
if (!IsNullOrEmpty(p) && !IsNullOrEmpty(p->ai_canonname))
canonical_hostname_ = p->ai_canonname;
}
-
- //we've successfully connected
+ // we've successfully connected
if (port_ > 0 && createConnection(p, addr) >= 0) {
return 0;
break;
@@ -206,10 +194,9 @@ short Socket::initialize() {
}
return -1;
-
}
-short Socket::select_descriptor(const uint16_t msec) {
+int16_t Socket::select_descriptor(const uint16_t msec) {
struct timeval tv;
int retval;
@@ -233,7 +220,6 @@ short Socket::select_descriptor(const uint16_t msec) {
for (int i = 0; i <= socket_max_; i++) {
if (FD_ISSET(i, &read_fds_)) {
-
if (i == socket_file_descriptor_) {
if (listeners_ > 0) {
struct sockaddr_storage remoteaddr; // client address
@@ -255,24 +241,23 @@ short Socket::select_descriptor(const uint16_t msec) {
return i;
}
}
-
}
return -1;
}
-short Socket::setSocketOptions(const int sock) {
+int16_t Socket::setSocketOptions(const int sock) {
int opt = 1;
bool nagle_off = true;
#ifndef __MACH__
if (nagle_off) {
- if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *) &opt, sizeof(opt))
+ if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), sizeof(opt))
< 0) {
logger_->log_error("setsockopt() TCP_NODELAY failed");
close(sock);
return -1;
}
- if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt,
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt),
sizeof(opt)) < 0) {
logger_->log_error("setsockopt() SO_REUSEADDR failed");
close(sock);
@@ -281,8 +266,8 @@ short Socket::setSocketOptions(const int sock) {
}
int sndsize = 256 * 1024;
- if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &sndsize,
- (int) sizeof(sndsize)) < 0) {
+ if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char *>( &sndsize),
+ sizeof(sndsize)) < 0) {
logger_->log_error("setsockopt() SO_SNDBUF failed");
close(sock);
return -1;
@@ -291,8 +276,8 @@ short Socket::setSocketOptions(const int sock) {
#else
if (listeners_ > 0) {
// lose the pesky "address already in use" error message
- if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt, sizeof(opt))
- < 0) {
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+ reinterpret_cast<char *>(&opt), sizeof(opt)) < 0) {
logger_->log_error("setsockopt() SO_REUSEADDR failed");
close(sock);
return -1;
@@ -307,22 +292,19 @@ std::string Socket::getHostname() const {
}
int Socket::writeData(std::vector<uint8_t> &buf, int buflen) {
-
if (buf.capacity() < buflen)
return -1;
- return writeData((uint8_t*) &buf[0], buflen);
+ return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen);
}
// data stream overrides
int Socket::writeData(uint8_t *value, int size) {
-
int ret = 0, bytes = 0;
while (bytes < size) {
-
ret = send(socket_file_descriptor_, value + bytes, size - bytes, 0);
- //check for errors
+ // check for errors
if (ret <= 0) {
close(socket_file_descriptor_);
logger_->log_error("Could not send to %d, error: %s",
@@ -330,27 +312,23 @@ int Socket::writeData(uint8_t *value, int size) {
return ret;
}
bytes += ret;
-
}
if (ret)
logger_->log_trace("Send data size %d over socket %d", size,
socket_file_descriptor_);
-
return bytes;
-
}
template<typename T>
inline std::vector<uint8_t> Socket::readBuffer(const T& t) {
std::vector<uint8_t> buf;
buf.resize(sizeof t);
- readData((uint8_t*) &buf[0], sizeof(t));
+ readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
return buf;
}
int Socket::write(uint64_t base_value, bool is_little_endian) {
-
return Serializable::write(base_value, this, is_little_endian);
}
@@ -363,7 +341,6 @@ int Socket::write(uint16_t base_value, bool is_little_endian) {
}
int Socket::read(uint64_t &value, bool is_little_endian) {
-
auto buf = readBuffer(value);
if (is_little_endian) {
@@ -381,68 +358,57 @@ int Socket::read(uint64_t &value, bool is_little_endian) {
}
int Socket::read(uint32_t &value, bool is_little_endian) {
-
auto buf = readBuffer(value);
if (is_little_endian) {
value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
} else {
value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
-
}
-
return sizeof(value);
}
int Socket::read(uint16_t &value, bool is_little_endian) {
-
auto buf = readBuffer(value);
if (is_little_endian) {
value = (buf[0] << 8) | buf[1];
} else {
value = buf[0] | buf[1] << 8;
-
}
return sizeof(value);
}
int Socket::readData(std::vector<uint8_t> &buf, int buflen) {
-
if (buf.capacity() < buflen) {
buf.resize(buflen);
}
- return readData((uint8_t*) &buf[0], buflen);
+ return readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen);
}
int Socket::readData(uint8_t *buf, int buflen) {
-
- int total_read = 0;
+ int32_t total_read = 0;
while (buflen) {
- short fd = select_descriptor(1000);
+ int16_t fd = select_descriptor(1000);
if (fd < 0) {
-
logger_->log_info("fd close %i", buflen);
close(socket_file_descriptor_);
return -1;
}
-
int bytes_read = recv(fd, buf, buflen, 0);
if (bytes_read <= 0) {
- if (bytes_read == 0)
+ if (bytes_read == 0) {
logger_->log_info("Other side hung up on %d", fd);
- else {
+ } else {
logger_->log_error("Could not recv on %d, error: %s", fd,
strerror(errno));
}
return -1;
}
-
buflen -= bytes_read;
buf += bytes_read;
total_read += bytes_read;
}
-
return total_read;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/DataStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/DataStream.cpp b/libminifi/src/io/DataStream.cpp
index 7a10bd9..9e0dfce 100644
--- a/libminifi/src/io/DataStream.cpp
+++ b/libminifi/src/io/DataStream.cpp
@@ -15,17 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-
+#include "io/DataStream.h"
+#include <arpa/inet.h>
#include <vector>
#include <iostream>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <string>
-#include <arpa/inet.h>
#include <algorithm>
-#include "io/DataStream.h"
namespace org {
namespace apache {
@@ -33,106 +31,92 @@ namespace nifi {
namespace minifi {
namespace io {
-
int DataStream::writeData(uint8_t *value, int size) {
-
- std::copy(value,value+size,std::back_inserter(buffer));
-
- return size;
+ std::copy(value, value + size, std::back_inserter(buffer));
+ return size;
}
int DataStream::read(uint64_t &value, bool is_little_endian) {
- if ((8 + readBuffer) > buffer.size()) {
- // if read exceed
- return -1;
- }
- uint8_t *buf = &buffer[readBuffer];
-
- if (is_little_endian) {
- value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48)
- | ((uint64_t) (buf[2] & 255) << 40)
- | ((uint64_t) (buf[3] & 255) << 32)
- | ((uint64_t) (buf[4] & 255) << 24)
- | ((uint64_t) (buf[5] & 255) << 16)
- | ((uint64_t) (buf[6] & 255) << 8)
- | ((uint64_t) (buf[7] & 255) << 0);
- } else {
- value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8)
- | ((uint64_t) (buf[2] & 255) << 16)
- | ((uint64_t) (buf[3] & 255) << 24)
- | ((uint64_t) (buf[4] & 255) << 32)
- | ((uint64_t) (buf[5] & 255) << 40)
- | ((uint64_t) (buf[6] & 255) << 48)
- | ((uint64_t) (buf[7] & 255) << 56);
- }
- readBuffer += 8;
- return 8;
+ if ((8 + readBuffer) > buffer.size()) {
+ // if read exceed
+ return -1;
+ }
+ uint8_t *buf = &buffer[readBuffer];
+
+ if (is_little_endian) {
+ value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48)
+ | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32)
+ | ((uint64_t) (buf[4] & 255) << 24) | ((uint64_t) (buf[5] & 255) << 16)
+ | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
+ } else {
+ value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8)
+ | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24)
+ | ((uint64_t) (buf[4] & 255) << 32) | ((uint64_t) (buf[5] & 255) << 40)
+ | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56);
+ }
+ readBuffer += 8;
+ return 8;
}
int DataStream::read(uint32_t &value, bool is_little_endian) {
- if ((4 + readBuffer) > buffer.size()) {
- // if read exceed
- return -1;
- }
- uint8_t *buf = &buffer[readBuffer];
-
- if (is_little_endian) {
- value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
- } else {
- value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
-
- }
- readBuffer += 4;
- return 4;
+ if ((4 + readBuffer) > buffer.size()) {
+ // if read exceed
+ return -1;
+ }
+ uint8_t *buf = &buffer[readBuffer];
+
+ if (is_little_endian) {
+ value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
+ } else {
+ value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
+ }
+ readBuffer += 4;
+ return 4;
}
int DataStream::read(uint16_t &value, bool is_little_endian) {
-
- if ((2 + readBuffer) > buffer.size()) {
- // if read exceed
- return -1;
- }
- uint8_t *buf = &buffer[readBuffer];
-
- if (is_little_endian) {
- value = (buf[0] << 8) | buf[1];
- } else {
- value = buf[0] | buf[1] << 8;
-
- }
- readBuffer += 2;
- return 2;
+ if ((2 + readBuffer) > buffer.size()) {
+ // if read exceed
+ return -1;
+ }
+ uint8_t *buf = &buffer[readBuffer];
+
+ if (is_little_endian) {
+ value = (buf[0] << 8) | buf[1];
+ } else {
+ value = buf[0] | buf[1] << 8;
+ }
+ readBuffer += 2;
+ return 2;
}
-int DataStream::readData(std::vector<uint8_t> &buf,int buflen) {
- if ((buflen + readBuffer) > buffer.size()) {
- // if read exceed
- return -1;
- }
+int DataStream::readData(std::vector<uint8_t> &buf, int buflen) {
+ if ((buflen + readBuffer) > buffer.size()) {
+ // if read exceed
+ return -1;
+ }
- if (buf.capacity() < buflen)
- buf.resize(buflen);
+ if (buf.capacity() < buflen)
+ buf.resize(buflen);
- buf.insert(buf.begin(),&buffer[readBuffer],&buffer[readBuffer+buflen]);
+ buf.insert(buf.begin(), &buffer[readBuffer], &buffer[readBuffer + buflen]);
- readBuffer += buflen;
- return buflen;
+ readBuffer += buflen;
+ return buflen;
}
+int DataStream::readData(uint8_t *buf, int buflen) {
+ if ((buflen + readBuffer) > buffer.size()) {
+ // if read exceed
+ return -1;
+ }
-int DataStream::readData(uint8_t *buf,int buflen) {
- if ((buflen + readBuffer) > buffer.size()) {
- // if read exceed
- return -1;
- }
-
- std::copy(&buffer[readBuffer],&buffer[readBuffer+buflen],buf);
+ std::copy(&buffer[readBuffer], &buffer[readBuffer + buflen], buf);
- readBuffer += buflen;
- return buflen;
+ readBuffer += buflen;
+ return buflen;
}
-
} /* namespace io */
} /* namespace minifi */
} /* namespace nifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/EndianCheck.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/EndianCheck.cpp b/libminifi/src/io/EndianCheck.cpp
index 1b5020d..fd754c4 100644
--- a/libminifi/src/io/EndianCheck.cpp
+++ b/libminifi/src/io/EndianCheck.cpp
@@ -25,7 +25,6 @@ namespace minifi {
namespace io {
bool EndiannessCheck::IS_LITTLE = EndiannessCheck::is_little_endian();
-
} /* namespace io */
} /* namespace minifi */
} /* namespace nifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/Serializable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp
index 7b7f2bd..c3f74c7 100644
--- a/libminifi/src/io/Serializable.cpp
+++ b/libminifi/src/io/Serializable.cpp
@@ -15,13 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#include "io/Serializable.h"
+#include <arpa/inet.h>
+#include <cstdio>
#include <iostream>
#include <vector>
#include <string>
-#include <cstdio>
-#include <arpa/inet.h>
+#include <algorithm>
#include "io/DataStream.h"
-#include "io/Serializable.h"
namespace org {
namespace apache {
namespace nifi {
@@ -29,8 +30,7 @@ namespace minifi {
namespace io {
#define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32))
-
-#define IS_ASCII(c) __builtin_expect(!!((c >= 1) && (c <= 127)),1)
+#define IS_ASCII(c) __builtin_expect(!!((c >= 1) && (c <= 127)), 1)
template<typename T>
int Serializable::writeData(const T &t, DataStream *stream) {
@@ -63,7 +63,7 @@ int Serializable::write(uint8_t value, DataStream *stream) {
return stream->writeData(&value, 1);
}
int Serializable::write(char value, DataStream *stream) {
- return stream->writeData((uint8_t *) &value, 1);
+ return stream->writeData(reinterpret_cast<uint8_t *>(&value), 1);
}
int Serializable::write(uint8_t *value, int len, DataStream *stream) {
@@ -89,7 +89,7 @@ int Serializable::read(char &value, DataStream *stream) {
int ret = stream->readData(&buf, 1);
if (ret == 1)
- value = (char) buf;
+ value = buf;
return ret;
}
@@ -105,17 +105,14 @@ int Serializable::read(uint16_t &value, DataStream *stream,
int Serializable::read(uint32_t &value, DataStream *stream,
bool is_little_endian) {
return stream->read(value, is_little_endian);
-
}
int Serializable::read(uint64_t &value, DataStream *stream,
bool is_little_endian) {
return stream->read(value, is_little_endian);
-
}
int Serializable::write(uint32_t base_value, DataStream *stream,
bool is_little_endian) {
-
const uint32_t value = is_little_endian ? htonl(base_value) : base_value;
return writeData(value, stream);
@@ -123,7 +120,6 @@ int Serializable::write(uint32_t base_value, DataStream *stream,
int Serializable::write(uint64_t base_value, DataStream *stream,
bool is_little_endian) {
-
const uint64_t value =
is_little_endian == 1 ? htonll_r(base_value) : base_value;
return writeData(value, stream);
@@ -131,7 +127,6 @@ int Serializable::write(uint64_t base_value, DataStream *stream,
int Serializable::write(uint16_t base_value, DataStream *stream,
bool is_little_endian) {
-
const uint16_t value = is_little_endian == 1 ? htons(base_value) : base_value;
return writeData(value, stream);
@@ -150,7 +145,6 @@ int Serializable::readUTF(std::string &str, DataStream *stream, bool widen) {
} else {
uint32_t len;
ret = read(len, stream);
-
if (ret <= 0)
return ret;
utflen = len;
@@ -166,7 +160,6 @@ int Serializable::readUTF(std::string &str, DataStream *stream, bool widen) {
// The number of chars produced may be less than utflen
str = std::string((const char*) &buf[0], utflen);
-
return utflen;
}
@@ -181,7 +174,6 @@ int Serializable::writeUTF(std::string str, DataStream *stream, bool widen) {
return -1;
if (utflen == 0) {
-
if (!widen) {
uint16_t shortLen = utflen;
write(shortLen, stream);
@@ -207,12 +199,10 @@ int Serializable::writeUTF(std::string str, DataStream *stream, bool widen) {
int ret;
if (!widen) {
-
uint16_t short_length = utflen;
write(short_length, stream);
ret = stream->writeData(utf_to_write.data(), utflen);
} else {
- //utflen += 4;
write(utflen, stream);
ret = stream->writeData(utf_to_write.data(), utflen);
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/StreamFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/StreamFactory.cpp b/libminifi/src/io/StreamFactory.cpp
index e3aa290..1cf419e 100644
--- a/libminifi/src/io/StreamFactory.cpp
+++ b/libminifi/src/io/StreamFactory.cpp
@@ -15,11 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "../../include/io/StreamFactory.h"
-
+#include "io/StreamFactory.h"
#include <atomic>
-#include <mutex>
-
+#include <mutex>
+
namespace org {
namespace apache {
namespace nifi {
@@ -29,7 +28,6 @@ namespace io {
std::atomic<StreamFactory*> StreamFactory::context_instance_;
std::mutex StreamFactory::context_mutex_;
-
} /* namespace io */
} /* namespace minifi */
} /* namespace nifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/tls/TLSSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp
index b2df394..1499840 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -15,10 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "properties/Configure.h"
#include "io/tls/TLSSocket.h"
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+#include <utility>
+#include <string>
+#include <vector>
+#include "properties/Configure.h"
#include "utils/StringUtils.h"
-
#include "core/Property.h"
namespace org {
@@ -27,116 +31,111 @@ namespace nifi {
namespace minifi {
namespace io {
-#include <openssl/ssl.h>
-#include <openssl/err.h>
-
std::atomic<TLSContext*> TLSContext::context_instance;
std::mutex TLSContext::context_mutex;
-TLSContext::TLSContext() :
- error_value(0), ctx(0), logger_(logging::Logger::getLogger()), configuration(
- Configure::getConfigure()) {
-
+TLSContext::TLSContext()
+ : error_value(0),
+ ctx(0),
+ logger_(logging::Logger::getLogger()),
+ configuration(Configure::getConfigure()) {
}
-
/**
* The memory barrier is defined by the singleton
*/
-short TLSContext::initialize() {
- if (ctx != 0) {
- return error_value;
- }
- std::string clientAuthStr;
- bool needClientCert = true;
- if (!(configuration->get(Configure::nifi_security_need_ClientAuth,
- clientAuthStr)
- && org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, needClientCert))) {
- needClientCert = true;
- }
-
- SSL_library_init();
- const SSL_METHOD *method;
-
- OpenSSL_add_all_algorithms();
- SSL_load_error_strings();
- method = TLSv1_2_client_method();
- ctx = SSL_CTX_new(method);
- if (ctx == NULL) {
- logger_->log_error("Could not create SSL context, error: %s.",
- std::strerror(errno));
- error_value = TLS_ERROR_CONTEXT;
- return error_value;
- }
- if (needClientCert) {
- std::string certificate;
- std::string privatekey;
- std::string passphrase;
- std::string caCertificate;
-
- if (!(configuration->get(Configure::nifi_security_client_certificate,
- certificate)
- && configuration->get(
- Configure::nifi_security_client_private_key, privatekey))) {
- logger_->log_error(
- "Certificate and Private Key PEM file not configured, error: %s.",
- std::strerror(errno));
- error_value = TLS_ERROR_PEM_MISSING;
- return error_value;
- }
- // load certificates and private key in PEM format
- if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(),
- SSL_FILETYPE_PEM) <= 0) {
- logger_->log_error("Could not create load certificate, error : %s",
- std::strerror(errno));
- error_value = TLS_ERROR_CERT_MISSING;
- return error_value;
-
- }
- if (configuration->get(Configure::nifi_security_client_pass_phrase,
- passphrase)) {
- // if the private key has passphase
- SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
- }
-
-
- int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(),
- SSL_FILETYPE_PEM);
- if (retp != 1) {
- logger_->log_error("Could not create load private key,%i on %s error : %s",
- retp,privatekey.c_str(),std::strerror(errno));
- error_value = TLS_ERROR_KEY_ERROR;
- return error_value;
- }
- // verify private key
- if (!SSL_CTX_check_private_key(ctx)) {
- logger_->log_error(
- "Private key does not match the public certificate, error : %s",
- std::strerror(errno));
- error_value = TLS_ERROR_KEY_ERROR;
- return error_value;
- }
- // load CA certificates
- if (configuration->get(Configure::nifi_security_client_ca_certificate,
- caCertificate)) {
- retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0);
- if (retp==0) {
- logger_->log_error(
- "Can not load CA certificate, Exiting, error : %s",
- std::strerror(errno));
- error_value = TLS_ERROR_CERT_ERROR;
- return error_value;
- }
- }
-
- logger_->log_info("Load/Verify Client Certificate OK.");
- }
- return 0;
+int16_t TLSContext::initialize() {
+ if (ctx != 0) {
+ return error_value;
+ }
+ std::string clientAuthStr;
+ bool needClientCert = true;
+ if (!(configuration->get(Configure::nifi_security_need_ClientAuth,
+ clientAuthStr)
+ && org::apache::nifi::minifi::utils::StringUtils::StringToBool(
+ clientAuthStr, needClientCert))) {
+ needClientCert = true;
+ }
+
+ SSL_library_init();
+ const SSL_METHOD *method;
+
+ OpenSSL_add_all_algorithms();
+ SSL_load_error_strings();
+ method = TLSv1_2_client_method();
+ ctx = SSL_CTX_new(method);
+ if (ctx == NULL) {
+ logger_->log_error("Could not create SSL context, error: %s.",
+ std::strerror(errno));
+ error_value = TLS_ERROR_CONTEXT;
+ return error_value;
+ }
+ if (needClientCert) {
+ std::string certificate;
+ std::string privatekey;
+ std::string passphrase;
+ std::string caCertificate;
+
+ if (!(configuration->get(Configure::nifi_security_client_certificate,
+ certificate)
+ && configuration->get(Configure::nifi_security_client_private_key,
+ privatekey))) {
+ logger_->log_error(
+ "Certificate and Private Key PEM file not configured, error: %s.",
+ std::strerror(errno));
+ error_value = TLS_ERROR_PEM_MISSING;
+ return error_value;
+ }
+ // load certificates and private key in PEM format
+ if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM)
+ <= 0) {
+ logger_->log_error("Could not create load certificate, error : %s",
+ std::strerror(errno));
+ error_value = TLS_ERROR_CERT_MISSING;
+ return error_value;
+ }
+ if (configuration->get(Configure::nifi_security_client_pass_phrase,
+ passphrase)) {
+ // if the private key has passphase
+ SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
+ }
+
+ int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(),
+ SSL_FILETYPE_PEM);
+ if (retp != 1) {
+ logger_->log_error(
+ "Could not create load private key,%i on %s error : %s", retp,
+ privatekey.c_str(), std::strerror(errno));
+ error_value = TLS_ERROR_KEY_ERROR;
+ return error_value;
+ }
+ // verify private key
+ if (!SSL_CTX_check_private_key(ctx)) {
+ logger_->log_error(
+ "Private key does not match the public certificate, error : %s",
+ std::strerror(errno));
+ error_value = TLS_ERROR_KEY_ERROR;
+ return error_value;
+ }
+ // load CA certificates
+ if (configuration->get(Configure::nifi_security_client_ca_certificate,
+ caCertificate)) {
+ retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0);
+ if (retp == 0) {
+ logger_->log_error("Can not load CA certificate, Exiting, error : %s",
+ std::strerror(errno));
+ error_value = TLS_ERROR_CERT_ERROR;
+ return error_value;
+ }
+ }
+
+ logger_->log_info("Load/Verify Client Certificate OK.");
+ }
+ return 0;
}
-TLSSocket::~TLSSocket()
-{
- if (ssl != 0)
- SSL_free(ssl);
+TLSSocket::~TLSSocket() {
+ if (ssl != 0)
+ SSL_free(ssl);
}
/**
* Constructor that accepts host name, port and listeners. With this
@@ -146,102 +145,100 @@ TLSSocket::~TLSSocket()
* @param listeners number of listeners in the queue
*/
TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port,
- const uint16_t listeners) :
- Socket(hostname, port, listeners), ssl(0) {
+ const uint16_t listeners)
+ : Socket(hostname, port, listeners),
+ ssl(0) {
}
-TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port) :
- Socket(hostname, port, 0), ssl(0) {
+TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port)
+ : Socket(hostname, port, 0),
+ ssl(0) {
}
-TLSSocket::TLSSocket(const TLSSocket &&d) :
- Socket(std::move(d)), ssl(0) {
+TLSSocket::TLSSocket(const TLSSocket &&d)
+ : Socket(std::move(d)),
+ ssl(0) {
}
-short TLSSocket::initialize() {
- TLSContext *context = TLSContext::getInstance();
- short ret = context->initialize();
- Socket::initialize();
- if (!ret) {
- // we have s2s secure config
- ssl = SSL_new(context->getContext());
- SSL_set_fd(ssl, socket_file_descriptor_);
- if (SSL_connect(ssl) == -1) {
- logger_->log_error("SSL socket connect failed to %s %d",
- requested_hostname_.c_str(), port_);
- SSL_free(ssl);
- ssl = NULL;
- close(socket_file_descriptor_);
- return -1;
- } else {
- logger_->log_info("SSL socket connect success to %s %d",
- requested_hostname_.c_str(), port_);
- return 0;
- }
- }
- return ret;
+int16_t TLSSocket::initialize() {
+ TLSContext *context = TLSContext::getInstance();
+ int16_t ret = context->initialize();
+ Socket::initialize();
+ if (!ret) {
+ // we have s2s secure config
+ ssl = SSL_new(context->getContext());
+ SSL_set_fd(ssl, socket_file_descriptor_);
+ if (SSL_connect(ssl) == -1) {
+ logger_->log_error("SSL socket connect failed to %s %d",
+ requested_hostname_.c_str(), port_);
+ SSL_free(ssl);
+ ssl = NULL;
+ close(socket_file_descriptor_);
+ return -1;
+ } else {
+ logger_->log_info("SSL socket connect success to %s %d",
+ requested_hostname_.c_str(), port_);
+ return 0;
+ }
+ }
+ return ret;
}
-short TLSSocket::select_descriptor(const uint16_t msec) {
- if (ssl && SSL_pending(ssl))
- return 1;
- return Socket::select_descriptor(msec);
+int16_t TLSSocket::select_descriptor(const uint16_t msec) {
+ if (ssl && SSL_pending(ssl))
+ return 1;
+ return Socket::select_descriptor(msec);
}
-int TLSSocket::writeData(std::vector< uint8_t>& buf, int buflen)
-{
- return Socket::writeData(buf,buflen);
+int TLSSocket::writeData(std::vector<uint8_t>& buf, int buflen) {
+ return Socket::writeData(buf, buflen);
}
int TLSSocket::writeData(uint8_t *value, int size) {
- if (IsNullOrEmpty(ssl))
- return -1;
- // for SSL, wait for the TLS IO is completed
- int bytes = 0;
- int sent = 0;
- while (bytes < size) {
-
- sent = SSL_write(ssl, value + bytes, size - bytes);
- //check for errors
- if (sent < 0) {
- logger_->log_error("Site2Site Peer socket %d send failed %s",
- socket_file_descriptor_, strerror(errno));
- return sent;
- }
- bytes += sent;
- }
- return size;
+ if (IsNullOrEmpty(ssl))
+ return -1;
+ // for SSL, wait for the TLS IO is completed
+ int bytes = 0;
+ int sent = 0;
+ while (bytes < size) {
+ sent = SSL_write(ssl, value + bytes, size - bytes);
+ // check for errors
+ if (sent < 0) {
+ logger_->log_error("Site2Site Peer socket %d send failed %s",
+ socket_file_descriptor_, strerror(errno));
+ return sent;
+ }
+ bytes += sent;
+ }
+ return size;
}
int TLSSocket::readData(uint8_t *buf, int buflen) {
-
- if (IsNullOrEmpty(ssl))
- return -1;
- int total_read = 0;
- int status = 0;
- while (buflen) {
- short fd = select_descriptor(1000);
- if (fd <= 0) {
-
- close(socket_file_descriptor_);
- return -1;
- }
-
- int sslStatus;
- do {
- status = SSL_read(ssl, buf, buflen);
- sslStatus = SSL_get_error(ssl, status);
- } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
-
- buflen -= status;
- buf += status;
- total_read += status;
- }
-
- return total_read;
+ if (IsNullOrEmpty(ssl))
+ return -1;
+ int total_read = 0;
+ int status = 0;
+ while (buflen) {
+ int16_t fd = select_descriptor(1000);
+ if (fd <= 0) {
+ close(socket_file_descriptor_);
+ return -1;
+ }
+
+ int sslStatus;
+ do {
+ status = SSL_read(ssl, buf, buflen);
+ sslStatus = SSL_get_error(ssl, status);
+ } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
+
+ buflen -= status;
+ buf += status;
+ total_read += status;
+ }
+
+ return total_read;
}
-
} /* namespace io */
} /* namespace minifi */
} /* namespace nifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/AppendHostInfo.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/AppendHostInfo.cpp b/libminifi/src/processors/AppendHostInfo.cpp
index 24ccc9a..b3c76db 100644
--- a/libminifi/src/processors/AppendHostInfo.cpp
+++ b/libminifi/src/processors/AppendHostInfo.cpp
@@ -17,27 +17,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include <set>
+#include "processors/AppendHostInfo.h"
+#define __USE_POSIX
+#include <limits.h>
#include <sys/time.h>
#include <string.h>
-#include "processors/AppendHostInfo.h"
-#include "core/ProcessContext.h"
-#include "core/Property.h"
-#include "core/ProcessSession.h"
-
#include <netdb.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <net/if.h>
#include <arpa/inet.h>
-
-#include "../../include/core/FlowFile.h"
+#include <memory>
+#include <string>
+#include <set>
+#include "core/ProcessContext.h"
+#include "core/Property.h"
+#include "core/ProcessSession.h"
+#include "core/FlowFile.h"
#include "io/ClientSocket.h"
-
-#define __USE_POSIX
-#include <limits.h>
-
namespace org {
namespace apache {
namespace nifi {
@@ -48,7 +46,6 @@ namespace processors {
#define HOST_NAME_MAX 255
#endif
-const std::string AppendHostInfo::ProcessorName("AppendHostInfo");
core::Property AppendHostInfo::InterfaceName(
"Network Interface Name",
"Network interface from which to read an IP v4 address", "eth0");
@@ -77,31 +74,29 @@ void AppendHostInfo::initialize() {
setSupportedRelationships(relationships);
}
-void AppendHostInfo::onTrigger(
- core::ProcessContext *context,
- core::ProcessSession *session) {
- std::shared_ptr<core::FlowFile> flow =
- session->get();
+void AppendHostInfo::onTrigger(core::ProcessContext *context,
+ core::ProcessSession *session) {
+ std::shared_ptr<core::FlowFile> flow = session->get();
if (!flow)
return;
- //Get Hostname
+ // Get Hostname
std::string hostAttribute = "";
context->getProperty(HostAttribute.getName(), hostAttribute);
flow->addAttribute(hostAttribute.c_str(),
org::apache::nifi::minifi::io::Socket::getMyHostName());
- //Get IP address for the specified interface
+ // Get IP address for the specified interface
std::string iface;
context->getProperty(InterfaceName.getName(), iface);
- //Confirm the specified interface name exists on this device
+ // Confirm the specified interface name exists on this device
if (if_nametoindex(iface.c_str()) != 0) {
struct ifreq ifr;
int fd = socket(AF_INET, SOCK_DGRAM, 0);
- //Type of address to retrieve - IPv4 IP address
+ // Type of address to retrieve - IPv4 IP address
ifr.ifr_addr.sa_family = AF_INET;
- //Copy the interface name in the ifreq structure
+ // Copy the interface name in the ifreq structure
strncpy(ifr.ifr_name, iface.c_str(), IFNAMSIZ - 1);
ioctl(fd, SIOCGIFADDR, &ifr);
close(fd);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/ExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ExecuteProcess.cpp b/libminifi/src/processors/ExecuteProcess.cpp
index 3cbbc1b..701c645 100644
--- a/libminifi/src/processors/ExecuteProcess.cpp
+++ b/libminifi/src/processors/ExecuteProcess.cpp
@@ -18,9 +18,12 @@
* limitations under the License.
*/
#include "processors/ExecuteProcess.h"
+#include <cstring>
+#include <memory>
+#include <string>
+#include <set>
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
-#include <cstring>
#include "utils/StringUtils.h"
#include "utils/TimeUtil.h"
@@ -30,14 +33,15 @@ namespace nifi {
namespace minifi {
namespace processors {
-const std::string ExecuteProcess::ProcessorName("ExecuteProcess");
core::Property ExecuteProcess::Command(
"Command",
- "Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.",
+ "Specifies the command to be executed; if just the name of an executable"
+ " is provided, it must be in the user's environment PATH.",
"");
core::Property ExecuteProcess::CommandArguments(
"Command Arguments",
- "The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.",
+ "The arguments to supply to the executable delimited by white space. White "
+ "space can be escaped by enclosing it in double-quotes.",
"");
core::Property ExecuteProcess::WorkingDir(
"Working Directory",
@@ -45,7 +49,8 @@ core::Property ExecuteProcess::WorkingDir(
"");
core::Property ExecuteProcess::BatchDuration(
"Batch Duration",
- "If the process is expected to be long-running and produce textual output, a batch duration can be specified.",
+ "If the process is expected to be long-running and produce textual output, a "
+ "batch duration can be specified.",
"0");
core::Property ExecuteProcess::RedirectErrorStream(
"Redirect Error Stream",
@@ -69,9 +74,8 @@ void ExecuteProcess::initialize() {
setSupportedRelationships(relationships);
}
-void ExecuteProcess::onTrigger(
- core::ProcessContext *context,
- core::ProcessSession *session) {
+void ExecuteProcess::onTrigger(core::ProcessContext *context,
+ core::ProcessSession *session) {
std::string value;
if (context->getProperty(Command.getName(), value)) {
this->_command = value;
@@ -84,12 +88,10 @@ void ExecuteProcess::onTrigger(
}
if (context->getProperty(BatchDuration.getName(), value)) {
core::TimeUnit unit;
- if (core::Property::StringToTime(value,
- _batchDuration,
- unit)
- && core::Property::ConvertTimeUnitToMS(
- _batchDuration, unit, _batchDuration)) {
-
+ if (core::Property::StringToTime(value, _batchDuration, unit)
+ && core::Property::ConvertTimeUnitToMS(_batchDuration, unit,
+ _batchDuration)) {
+ logger_->log_info("Setting _batchDuration");
}
}
if (context->getProperty(RedirectErrorStream.getName(), value)) {
@@ -112,9 +114,7 @@ void ExecuteProcess::onTrigger(
}
logger_->log_info("Execute Command %s", _fullCommand.c_str());
// split the command into array
- char cstr[_fullCommand.length() + 1];
- std::strcpy(cstr, _fullCommand.c_str());
- char *p = std::strtok(cstr, " ");
+ char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
int argc = 0;
char *argv[64];
while (p != 0 && argc < 64) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/GenerateFlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GenerateFlowFile.cpp b/libminifi/src/processors/GenerateFlowFile.cpp
index ebdaaa3..34c0ae2 100644
--- a/libminifi/src/processors/GenerateFlowFile.cpp
+++ b/libminifi/src/processors/GenerateFlowFile.cpp
@@ -17,18 +17,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#include "processors/GenerateFlowFile.h"
+#include <sys/time.h>
+#include <time.h>
#include <vector>
#include <queue>
#include <map>
+#include <memory>
+#include <string>
#include <set>
-#include <sys/time.h>
-#include <time.h>
#include <chrono>
#include <thread>
#include <random>
#include "utils/StringUtils.h"
-
-#include "processors/GenerateFlowFile.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
@@ -39,104 +40,101 @@ namespace minifi {
namespace processors {
const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary";
const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text";
-const std::string GenerateFlowFile::ProcessorName("GenerateFlowFile");
-core::Property GenerateFlowFile::FileSize("File Size", "The size of the file that will be used", "1 kB");
-core::Property GenerateFlowFile::BatchSize("Batch Size", "The number of FlowFiles to be transferred in each invocation", "1");
-core::Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether the data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY);
-core::Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles",
- "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles", "true");
-core::Relationship GenerateFlowFile::Success("success", "success operational on the flow record");
+core::Property GenerateFlowFile::FileSize(
+ "File Size", "The size of the file that will be used", "1 kB");
+core::Property GenerateFlowFile::BatchSize(
+ "Batch Size",
+ "The number of FlowFiles to be transferred in each invocation", "1");
+core::Property GenerateFlowFile::DataFormat(
+ "Data Format", "Specifies whether the data should be Text or Binary",
+ GenerateFlowFile::DATA_FORMAT_BINARY);
+core::Property GenerateFlowFile::UniqueFlowFiles(
+ "Unique FlowFiles",
+ "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles",
+ "true");
+core::Relationship GenerateFlowFile::Success(
+ "success", "success operational on the flow record");
-void GenerateFlowFile::initialize()
-{
- // Set the supported properties
- std::set<core::Property> properties;
- properties.insert(FileSize);
- properties.insert(BatchSize);
- properties.insert(DataFormat);
- properties.insert(UniqueFlowFiles);
- setSupportedProperties(properties);
- // Set the supported relationships
- std::set<core::Relationship> relationships;
- relationships.insert(Success);
- setSupportedRelationships(relationships);
+void GenerateFlowFile::initialize() {
+ // Set the supported properties
+ std::set<core::Property> properties;
+ properties.insert(FileSize);
+ properties.insert(BatchSize);
+ properties.insert(DataFormat);
+ properties.insert(UniqueFlowFiles);
+ setSupportedProperties(properties);
+ // Set the supported relationships
+ std::set<core::Relationship> relationships;
+ relationships.insert(Success);
+ setSupportedRelationships(relationships);
}
-void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session)
-{
- int64_t batchSize = 1;
- bool uniqueFlowFile = true;
- int64_t fileSize = 1024;
+void GenerateFlowFile::onTrigger(core::ProcessContext *context,
+ core::ProcessSession *session) {
+ int64_t batchSize = 1;
+ bool uniqueFlowFile = true;
+ int64_t fileSize = 1024;
- std::string value;
- if (context->getProperty(FileSize.getName(), value))
- {
- core::Property::StringToInt(value, fileSize);
- }
- if (context->getProperty(BatchSize.getName(), value))
- {
- core::Property::StringToInt(value, batchSize);
- }
- if (context->getProperty(UniqueFlowFiles.getName(), value))
- {
- org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, uniqueFlowFile);
- }
+ std::string value;
+ if (context->getProperty(FileSize.getName(), value)) {
+ core::Property::StringToInt(value, fileSize);
+ }
+ if (context->getProperty(BatchSize.getName(), value)) {
+ core::Property::StringToInt(value, batchSize);
+ }
+ if (context->getProperty(UniqueFlowFiles.getName(), value)) {
+ org::apache::nifi::minifi::utils::StringUtils::StringToBool(value,
+ uniqueFlowFile);
+ }
- if (!uniqueFlowFile)
- {
- char *data;
- data = new char[fileSize];
- if (!data)
- return;
- uint64_t dataSize = fileSize;
- GenerateFlowFile::WriteCallback callback(data, dataSize);
- char *current = data;
- for (int i = 0; i < fileSize; i+= sizeof(int))
- {
- int randValue = random();
- *((int *) current) = randValue;
- current += sizeof(int);
- }
- for (int i = 0; i < batchSize; i++)
- {
- // For each batch
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
- if (!flowFile)
- return;
- if (fileSize > 0)
- session->write(flowFile, &callback);
- session->transfer(flowFile, Success);
- }
- delete[] data;
- }
- else
- {
- if (!_data)
- {
- // We have not create the unique data yet
- _data = new char[fileSize];
- _dataSize = fileSize;
- char *current = _data;
- for (int i = 0; i < fileSize; i+= sizeof(int))
- {
- int randValue = random();
- *((int *) current) = randValue;
- // *((int *) current) = (0xFFFFFFFF & i);
- current += sizeof(int);
- }
- }
- GenerateFlowFile::WriteCallback callback(_data, _dataSize);
- for (int i = 0; i < batchSize; i++)
- {
- // For each batch
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
- if (!flowFile)
- return;
- if (fileSize > 0)
- session->write(flowFile, &callback);
- session->transfer(flowFile, Success);
- }
- }
+ if (!uniqueFlowFile) {
+ char *data;
+ data = new char[fileSize];
+ if (!data)
+ return;
+ uint64_t dataSize = fileSize;
+ GenerateFlowFile::WriteCallback callback(data, dataSize);
+ char *current = data;
+ for (int i = 0; i < fileSize; i += sizeof(int)) {
+ int randValue = random();
+ *(reinterpret_cast<int*>(current)) = randValue;
+ current += sizeof(int);
+ }
+ for (int i = 0; i < batchSize; i++) {
+ // For each batch
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
+ FlowFileRecord>(session->create());
+ if (!flowFile)
+ return;
+ if (fileSize > 0)
+ session->write(flowFile, &callback);
+ session->transfer(flowFile, Success);
+ }
+ delete[] data;
+ } else {
+ if (!_data) {
+ // We have not create the unique data yet
+ _data = new char[fileSize];
+ _dataSize = fileSize;
+ char *current = _data;
+ for (int i = 0; i < fileSize; i += sizeof(int)) {
+ int randValue = random();
+ *(reinterpret_cast<int*>(current)) = randValue;
+ current += sizeof(int);
+ }
+ }
+ GenerateFlowFile::WriteCallback callback(_data, _dataSize);
+ for (int i = 0; i < batchSize; i++) {
+ // For each batch
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
+ FlowFileRecord>(session->create());
+ if (!flowFile)
+ return;
+ if (fileSize > 0)
+ session->write(flowFile, &callback);
+ session->transfer(flowFile, Success);
+ }
+ }
}
} /* namespace processors */
} /* namespace minifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp
index 652caf7..2740793 100644
--- a/libminifi/src/processors/GetFile.cpp
+++ b/libminifi/src/processors/GetFile.cpp
@@ -15,30 +15,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
+#include "processors/GetFile.h"
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <time.h>
-#include <sstream>
#include <stdio.h>
-#include <string>
-#include <iostream>
#include <dirent.h>
#include <limits.h>
#include <unistd.h>
-#if (__GNUC__ >= 4)
+#if (__GNUC__ >= 4)
#if (__GNUC_MINOR__ < 9)
#include <regex.h>
+#else
+#include <regex>
#endif
#endif
+#include <vector>
+#include <queue>
+#include <map>
+#include <memory>
+#include <set>
+#include <sstream>
+#include <string>
+#include <iostream>
#include "utils/StringUtils.h"
-#include <regex>
#include "utils/TimeUtil.h"
-#include "processors/GetFile.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
@@ -47,7 +49,6 @@ namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
-const std::string GetFile::ProcessorName("GetFile");
core::Property GetFile::BatchSize(
"Batch Size", "The maximum number of files to pull in each iteration",
"10");
@@ -62,11 +63,13 @@ core::Property GetFile::KeepSourceFile(
"false");
core::Property GetFile::MaxAge(
"Maximum File Age",
- "The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored",
+ "The minimum age that a file must be in order to be pulled;"
+ " any file younger than this amount of time (according to last modification date) will be ignored",
"0 sec");
core::Property GetFile::MinAge(
"Minimum File Age",
- "The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored",
+ "The maximum age that a file must be in order to be pulled; any file"
+ "older than this amount of time (according to last modification date) will be ignored",
"0 sec");
core::Property GetFile::MaxSize(
"Maximum File Size",
@@ -113,7 +116,6 @@ void GetFile::onSchedule(core::ProcessContext *context,
core::ProcessSessionFactory *sessionFactory) {
std::string value;
- logger_->log_info("onTrigger GetFile");
if (context->getProperty(Directory.getName(), value)) {
request_.directory = value;
}
@@ -129,13 +131,12 @@ void GetFile::onSchedule(core::ProcessContext *context,
value, request_.keepSourceFile);
}
- logger_->log_info("onTrigger GetFile");
if (context->getProperty(MaxAge.getName(), value)) {
core::TimeUnit unit;
if (core::Property::StringToTime(value, request_.maxAge, unit)
&& core::Property::ConvertTimeUnitToMS(request_.maxAge, unit,
request_.maxAge)) {
-
+ logger_->log_debug("successfully applied _maxAge");
}
}
if (context->getProperty(MinAge.getName(), value)) {
@@ -143,7 +144,7 @@ void GetFile::onSchedule(core::ProcessContext *context,
if (core::Property::StringToTime(value, request_.minAge, unit)
&& core::Property::ConvertTimeUnitToMS(request_.minAge, unit,
request_.minAge)) {
-
+ logger_->log_debug("successfully applied _minAge");
}
}
if (context->getProperty(MaxSize.getName(), value)) {
@@ -157,7 +158,7 @@ void GetFile::onSchedule(core::ProcessContext *context,
if (core::Property::StringToTime(value, request_.pollInterval, unit)
&& core::Property::ConvertTimeUnitToMS(request_.pollInterval, unit,
request_.pollInterval)) {
-
+ logger_->log_debug("successfully applied _pollInterval");
}
}
if (context->getProperty(Recurse.getName(), value)) {
@@ -172,11 +173,9 @@ void GetFile::onSchedule(core::ProcessContext *context,
void GetFile::onTrigger(core::ProcessContext *context,
core::ProcessSession *session) {
-
// Perform directory list
logger_->log_info("Is listing empty %i", isListingEmpty());
if (isListingEmpty()) {
-
if (request_.pollInterval == 0
|| (getTimeMillis() - last_listing_time_) > request_.pollInterval) {
performListing(request_.directory, request_);
@@ -190,7 +189,6 @@ void GetFile::onTrigger(core::ProcessContext *context,
std::queue<std::string> list;
pollListing(list, request_);
while (!list.empty()) {
-
std::string fileName = list.front();
list.pop();
logger_->log_info("GetFile process %s", fileName.c_str());
@@ -214,7 +212,6 @@ void GetFile::onTrigger(core::ProcessContext *context,
throw;
}
}
-
}
bool GetFile::isListingEmpty() {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp
index 36c743e..5c4a6bb 100644
--- a/libminifi/src/processors/ListenHTTP.cpp
+++ b/libminifi/src/processors/ListenHTTP.cpp
@@ -1,5 +1,6 @@
/**
* @file ListenHTTP.cpp
+
* ListenHTTP class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -17,17 +18,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include <sstream>
+#include "processors/ListenHTTP.h"
+#include <uuid/uuid.h>
+#include <CivetServer.h>
#include <stdio.h>
+#include <sstream>
+#include <utility>
+#include <memory>
#include <string>
#include <iostream>
#include <fstream>
-#include <uuid/uuid.h>
-
-#include <CivetServer.h>
-
-#include "processors/ListenHTTP.h"
-
+#include <set>
+#include <vector>
#include "utils/TimeUtil.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
@@ -39,15 +41,15 @@ namespace nifi {
namespace minifi {
namespace processors {
-const std::string ListenHTTP::ProcessorName("ListenHTTP");
-
-core::Property ListenHTTP::BasePath(
- "Base Path", "Base path for incoming connections", "contentListener");
+core::Property ListenHTTP::BasePath("Base Path",
+ "Base path for incoming connections",
+ "contentListener");
core::Property ListenHTTP::Port(
"Listening Port", "The Port to listen on for incoming connections", "");
core::Property ListenHTTP::AuthorizedDNPattern(
"Authorized DN Pattern",
- "A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.",
+ "A Regular Expression to apply against the Distinguished Name of incoming"
+ " connections. If the Pattern does not match the DN, the connection will be refused.",
".*");
core::Property ListenHTTP::SSLCertificate(
"SSL Certificate",
@@ -65,17 +67,18 @@ core::Property ListenHTTP::SSLMinimumVersion(
"SSL2");
core::Property ListenHTTP::HeadersAsAttributesRegex(
"HTTP Headers to receive as Attributes (Regex)",
- "Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes",
+ "Specifies the Regular Expression that determines the names of HTTP Headers that"
+ " should be passed along as FlowFile attributes",
"");
-core::Relationship ListenHTTP::Success(
- "success", "All files are routed to success");
+core::Relationship ListenHTTP::Success("success",
+ "All files are routed to success");
void ListenHTTP::initialize() {
_logger->log_info("Initializing ListenHTTP");
// Set the supported properties
- std::set < core::Property > properties;
+ std::set<core::Property> properties;
properties.insert(BasePath);
properties.insert(Port);
properties.insert(AuthorizedDNPattern);
@@ -84,17 +87,15 @@ void ListenHTTP::initialize() {
properties.insert(SSLVerifyPeer);
properties.insert(SSLMinimumVersion);
properties.insert(HeadersAsAttributesRegex);
- setSupportedProperties (properties);
+ setSupportedProperties(properties);
// Set the supported relationships
- std::set < core::Relationship > relationships;
+ std::set<core::Relationship> relationships;
relationships.insert(Success);
- setSupportedRelationships (relationships);
+ setSupportedRelationships(relationships);
}
-void ListenHTTP::onSchedule(
- core::ProcessContext *context,
- core::ProcessSessionFactory *sessionFactory) {
-
+void ListenHTTP::onSchedule(core::ProcessContext *context,
+ core::ProcessSessionFactory *sessionFactory) {
std::string basePath;
if (!context->getProperty(BasePath.getName(), basePath)) {
@@ -178,7 +179,7 @@ void ListenHTTP::onSchedule(
listeningPort.c_str(), basePath.c_str(), numThreads);
// Initialize web server
- std::vector < std::string > options;
+ std::vector<std::string> options;
options.push_back("enable_keep_alive");
options.push_back("yes");
options.push_back("keep_alive_timeout_ms");
@@ -238,12 +239,10 @@ void ListenHTTP::onSchedule(
ListenHTTP::~ListenHTTP() {
}
-void ListenHTTP::onTrigger(
- core::ProcessContext *context,
- core::ProcessSession *session) {
-
- std::shared_ptr < FlowFileRecord > flowFile = std::static_pointer_cast
- < FlowFileRecord > (session->get());
+void ListenHTTP::onTrigger(core::ProcessContext *context,
+ core::ProcessSession *session) {
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
+ FlowFileRecord>(session->get());
// Do nothing if there are no incoming files
if (!flowFile) {
@@ -251,10 +250,10 @@ void ListenHTTP::onTrigger(
}
}
-ListenHTTP::Handler::Handler(
- core::ProcessContext *context,
- core::ProcessSessionFactory *sessionFactory,
- std::string &&authDNPattern, std::string &&headersAsAttributesPattern)
+ListenHTTP::Handler::Handler(core::ProcessContext *context,
+ core::ProcessSessionFactory *sessionFactory,
+ std::string &&authDNPattern,
+ std::string &&headersAsAttributesPattern)
: _authDNRegex(std::move(authDNPattern)),
_headersAsAttributesRegex(std::move(headersAsAttributesPattern)) {
_processContext = context;
@@ -292,8 +291,7 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server,
auto session = _processSessionFactory->createSession();
ListenHTTP::WriteCallback callback(conn, req_info);
- auto flowFile = std::static_pointer_cast < FlowFileRecord
- > (session->create());
+ auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
if (!flowFile) {
sendErrorResponse(conn);
@@ -347,9 +345,9 @@ ListenHTTP::WriteCallback::WriteCallback(
}
void ListenHTTP::WriteCallback::process(std::ofstream *stream) {
- long long rlen;
- long long nlen = 0;
- long long tlen = _reqInfo->content_length;
+ int64_t rlen;
+ int64_t nlen = 0;
+ int64_t tlen = _reqInfo->content_length;
char buf[16384];
while (nlen < tlen) {