You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/11/29 12:03:16 UTC
nifi-minifi-cpp git commit: MINIFICPP-681 - Add content hash processor
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 1cb750866 -> b53f497f3
MINIFICPP-681 - Add content hash processor
This closes #445.
Signed-off-by: Marc Parisi <ph...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/b53f497f
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/b53f497f
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/b53f497f
Branch: refs/heads/master
Commit: b53f497f329dce6e02a519ff7a6d3f1ab20bf853
Parents: 1cb7508
Author: Arpad Boda <ab...@hortonworks.com>
Authored: Mon Nov 19 13:49:39 2018 +0100
Committer: Marc Parisi <ph...@apache.org>
Committed: Thu Nov 29 07:03:00 2018 -0500
----------------------------------------------------------------------
PROCESSORS.md | 26 ++++
libminifi/include/processors/HashContent.h | 196 ++++++++++++++++++++++++
libminifi/src/processors/HashContent.cpp | 100 ++++++++++++
libminifi/test/unit/HashContentTest.cpp | 129 ++++++++++++++++
4 files changed, 451 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b53f497f/PROCESSORS.md
----------------------------------------------------------------------
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 49bfd30..f07a74a 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -33,6 +33,7 @@
- [GenerateFlowFile](#generateflowfile)
- [GetFile](#getfile)
- [GetUSBCamera](#getusbcamera)
+- [HashContent](#hashcontent)
- [InvokeHTTP](#invokehttp)
- [ListenHTTP](#listenhttp)
- [ListenSyslog](#listensyslog)
@@ -130,6 +131,31 @@ default values, and whether a property supports the NiFi Expression Language.
| success | All FlowFiles are routed to this relationship. |
+## HashContent
+
+### Description
+
+HashContent calculates the checksum of the content of the flowfile and adds it as an attribute.
+Configuration options exist to select hashing algorithm and set the name of the attribute.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other
+properties (not in bold) are considered optional. The table also indicates any
+default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+| Hash Attribute | Checksum | | Name of the attribute the processor will use to add the checksum |
+| Hash Algorithm | MD5 | MD5, SHA1, SHA256 | Name of the algorithm used to calculate the checksum |
+
+### Relationships
+
+| Name | Description |
+| - | - |
+| success | All FlowFiles are routed to this relationship. |
+
+
## ConvertHeartBeat
This Processor converts MQTT heartbeats into a JSON repreesntation.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b53f497f/libminifi/include/processors/HashContent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/HashContent.h b/libminifi/include/processors/HashContent.h
new file mode 100644
index 0000000..acafd0f
--- /dev/null
+++ b/libminifi/include/processors/HashContent.h
@@ -0,0 +1,196 @@
+/**
+ * @file HashContent.h
+ * HashContent class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef NIFI_MINIFI_CPP_HashContent_H
+#define NIFI_MINIFI_CPP_HashContent_H
+
+#ifdef OPENSSL_SUPPORT
+
+#include <iomanip>
+#include <map>
+#include <memory>
+#include <string>
+#include <sstream>
+#include <utility>
+#include <stdint.h>
+
+#include <openssl/md5.h>
+#include <openssl/sha.h>
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "io/BaseStream.h"
+
+using HashReturnType = std::pair<std::string, int64_t>;
+
+namespace {
+#define HASH_BUFFER_SIZE 16384
+
+ std::string digestToString(const unsigned char * const digest, size_t size) {
+ std::stringstream ss;
+ for(int i = 0; i < size; i++)
+ {
+ ss << std::uppercase << std::hex << std::setw(2) << std::setfill('0') << (int)digest[i];
+ }
+ return ss.str();
+ }
+
+ HashReturnType MD5Hash(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
+ HashReturnType ret_val;
+ ret_val.second = 0;
+ uint8_t buffer[HASH_BUFFER_SIZE];
+ MD5_CTX context;
+ MD5_Init(&context);
+
+ size_t ret = 0;
+ do {
+ ret = stream->readData(buffer, HASH_BUFFER_SIZE);
+ if(ret > 0) {
+ MD5_Update(&context, buffer, ret);
+ ret_val.second += ret;
+ }
+ } while(ret > 0);
+
+ if (ret_val.second > 0) {
+ unsigned char digest[MD5_DIGEST_LENGTH];
+ MD5_Final(digest, &context);
+ ret_val.first = digestToString(digest, MD5_DIGEST_LENGTH);
+ }
+ return ret_val;
+ }
+
+ HashReturnType SHA1Hash(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
+ HashReturnType ret_val;
+ ret_val.second = 0;
+ uint8_t buffer[HASH_BUFFER_SIZE];
+ SHA_CTX context;
+ SHA1_Init(&context);
+
+ size_t ret = 0;
+ do {
+ ret = stream->readData(buffer, HASH_BUFFER_SIZE);
+ if(ret > 0) {
+ SHA1_Update(&context, buffer, ret);
+ ret_val.second += ret;
+ }
+ } while(ret > 0);
+
+ if (ret_val.second > 0) {
+ unsigned char digest[SHA_DIGEST_LENGTH];
+ SHA1_Final(digest, &context);
+ ret_val.first = digestToString(digest, SHA_DIGEST_LENGTH);
+ }
+ return ret_val;
+ }
+
+ HashReturnType SHA256Hash(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
+ HashReturnType ret_val;
+ ret_val.second = 0;
+ uint8_t buffer[HASH_BUFFER_SIZE];
+ SHA256_CTX context;
+ SHA256_Init(&context);
+
+ size_t ret ;
+ do {
+ ret = stream->readData(buffer, HASH_BUFFER_SIZE);
+ if(ret > 0) {
+ SHA256_Update(&context, buffer, ret);
+ ret_val.second += ret;
+ }
+ } while(ret > 0);
+
+ if (ret_val.second > 0) {
+ unsigned char digest[SHA256_DIGEST_LENGTH];
+ SHA256_Final(digest, &context);
+ ret_val.first = digestToString(digest, SHA256_DIGEST_LENGTH);
+ }
+ return ret_val;
+ }
+}
+
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+static const std::map<std::string, const std::function<HashReturnType(const std::shared_ptr<io::BaseStream>&)>> HashAlgos =
+ { {"MD5", MD5Hash}, {"SHA1", SHA1Hash}, {"SHA256", SHA256Hash} };
+
+//! HashContent Class
+class HashContent : public core::Processor {
+ public:
+ //! Constructor
+ /*!
+ * Create a new processor
+ */
+ explicit HashContent(std::string name, utils::Identifier uuid = utils::Identifier())
+ : Processor(name, uuid)
+ {
+ logger_ = logging::LoggerFactory<HashContent>::getLogger();
+ }
+ //! Processor Name
+ static constexpr char const* ProcessorName = "HashContent";
+ //! Supported Properties
+ static core::Property HashAttribute;
+ static core::Property HashAlgorithm;
+ //! Supported Relationships
+ static core::Relationship Success;
+
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
+
+ //! OnTrigger method, implemented by NiFi HashContent
+ void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
+ //! Initialize, over write by NiFi HashContent
+ void initialize(void) override;
+
+ class ReadCallback : public InputStreamCallback {
+ public:
+ ReadCallback(std::shared_ptr<core::FlowFile> flowFile, const HashContent& parent);
+ ~ReadCallback() {}
+ int64_t process(std::shared_ptr<io::BaseStream> stream);
+
+ private:
+ std::shared_ptr<core::FlowFile> flowFile_;
+ const HashContent& parent_;
+ };
+
+ protected:
+
+ private:
+ //! Logger
+ std::shared_ptr<logging::Logger> logger_;
+ std::string algoName_;
+ std::string attrKey_;
+};
+
+REGISTER_RESOURCE(HashContent);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //OPENSSL_SUPPORT
+
+#endif //NIFI_MINIFI_CPP_HashContent_H
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b53f497f/libminifi/src/processors/HashContent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/HashContent.cpp b/libminifi/src/processors/HashContent.cpp
new file mode 100644
index 0000000..e76a368
--- /dev/null
+++ b/libminifi/src/processors/HashContent.cpp
@@ -0,0 +1,100 @@
+/**
+ * @file HashContent.cpp
+ * HashContent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef OPENSSL_SUPPORT
+
+#include <algorithm>
+#include <iostream>
+#include <memory>
+#include <string>
+#include "processors/HashContent.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/FlowFile.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property HashContent::HashAttribute("Hash Attribute", "Attribute to store checksum to", "Checksum");
+core::Property HashContent::HashAlgorithm("Hash Algorithm", "Name of the algorithm used to generate checksum", "MD5");
+core::Relationship HashContent::Success("success", "success operational on the flow record");
+
+void HashContent::initialize() {
+ //! Set the supported properties
+ std::set<core::Property> properties;
+ properties.insert(HashAttribute);
+ properties.insert(HashAlgorithm);
+ setSupportedProperties(properties);
+ //! Set the supported relationships
+ std::set<core::Relationship> relationships;
+ relationships.insert(Success);
+ setSupportedRelationships(relationships);
+}
+
+void HashContent::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+ std::string value;
+
+ attrKey_ = (context->getProperty(HashAttribute.getName(), value)) ? value : "Checksum";
+ algoName_ = (context->getProperty(HashAlgorithm.getName(), value)) ? value : "MD5";
+
+ std::transform(algoName_.begin(), algoName_.end(), algoName_.begin(), ::toupper);
+
+ // Erase '-' to make sha-256 and sha-1 work, too
+ algoName_.erase(std::remove(algoName_.begin(), algoName_.end(), '-'), algoName_.end());
+}
+
+void HashContent::onTrigger(core::ProcessContext *, core::ProcessSession *session) {
+ std::shared_ptr<core::FlowFile> flowFile = session->get();
+
+ if (!flowFile) {
+ return;
+ }
+
+ ReadCallback cb(flowFile, *this);
+ session->read(flowFile, &cb);
+ session->transfer(flowFile, Success);
+}
+
+int64_t HashContent::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+ // This throws in case algo is not found, but that's fine
+ auto algo = HashAlgos.at(parent_.algoName_);
+
+ const auto& ret_val = algo(stream);
+
+ flowFile_->setAttribute(parent_.attrKey_, ret_val.first);
+
+ return ret_val.second;
+}
+
+HashContent::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, const HashContent& parent)
+ : flowFile_(flowFile),
+ parent_(parent)
+ {}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif // OPENSSL_SUPPORT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b53f497f/libminifi/test/unit/HashContentTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/HashContentTest.cpp b/libminifi/test/unit/HashContentTest.cpp
new file mode 100644
index 0000000..e7fba41
--- /dev/null
+++ b/libminifi/test/unit/HashContentTest.cpp
@@ -0,0 +1,129 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef OPENSSL_SUPPORT
+
+#include <uuid/uuid.h>
+#include <fstream>
+#include <map>
+#include <memory>
+#include <utility>
+#include <string>
+#include <set>
+#include <iostream>
+
+#include "../TestBase.h"
+#include "core/Core.h"
+
+#include "core/FlowFile.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+
+#include "processors/GetFile.h"
+#include "processors/HashContent.h"
+#include "processors/LogAttribute.h"
+
+const char* TEST_TEXT = "Test text";
+const char* TEST_FILE = "test_file.txt";
+
+const char* MD5_ATTR = "MD5Attr";
+const char* SHA1_ATTR = "SHA1Attr";
+const char* SHA256_ATTR = "SHA256Attr";
+
+const char* MD5_CHECKSUM = "4FE8A693C64F93F65C5FAF42DC49AB23";
+const char* SHA1_CHECKSUM = "03840DEB949D6CF0C0A624FA7EBA87FBDBCB7783";
+const char* SHA256_CHECKSUM = "66D5B2CC06203137F8A0E9714638DC1085C57A3F1FA26C8823AE5CF89AB26488";
+
+
+TEST_CASE("Test Creation of HashContent", "[HashContentCreate]") {
+ TestController testController;
+ std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::HashContent>("processorname");
+ REQUIRE(processor->getName() == "processorname");
+ utils::Identifier processoruuid;
+ REQUIRE(processor->getUUID(processoruuid));
+}
+
+TEST_CASE("Test usage of ExtractText", "[extracttextTest]") {
+ TestController testController;
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+ char dir[] = "/tmp/gt.XXXXXX";
+
+ REQUIRE(testController.createTempDirectory(dir) != nullptr);
+
+ std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
+ plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+ plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::KeepSourceFile.getName(), "true");
+
+ std::shared_ptr<core::Processor> md5processor = plan->addProcessor("HashContent", "HashContentMD5",
+ core::Relationship("success", "description"), true);
+ plan->setProperty(md5processor, org::apache::nifi::minifi::processors::HashContent::HashAttribute.getName(), MD5_ATTR);
+ plan->setProperty(md5processor, org::apache::nifi::minifi::processors::HashContent::HashAlgorithm.getName(), "MD5");
+
+ std::shared_ptr<core::Processor> shaprocessor = plan->addProcessor("HashContent", "HashContentSHA1",
+ core::Relationship("success", "description"), true);
+ plan->setProperty(shaprocessor, org::apache::nifi::minifi::processors::HashContent::HashAttribute.getName(), SHA1_ATTR);
+ plan->setProperty(shaprocessor, org::apache::nifi::minifi::processors::HashContent::HashAlgorithm.getName(), "sha1");
+
+ std::shared_ptr<core::Processor> sha2processor = plan->addProcessor("HashContent", "HashContentSHA256",
+ core::Relationship("success", "description"), true);
+ plan->setProperty(sha2processor, org::apache::nifi::minifi::processors::HashContent::HashAttribute.getName(), SHA256_ATTR);
+ plan->setProperty(sha2processor, org::apache::nifi::minifi::processors::HashContent::HashAlgorithm.getName(), "sha-256");
+
+ std::shared_ptr<core::Processor> laprocessor = plan->addProcessor("LogAttribute", "outputLogAttribute",
+ core::Relationship("success", "description"), true);
+
+ std::stringstream ss1;
+ ss1 << dir << "/" << TEST_FILE;
+ std::string test_file_path = ss1.str();
+
+ std::ofstream test_file(test_file_path);
+ if (test_file.is_open()) {
+ test_file << TEST_TEXT << std::endl;
+ test_file.close();
+ }
+
+ for (int i = 0; i < 5; ++i) {
+ plan->runNextProcessor();
+ }
+
+ std::stringstream ss2;
+ ss2 << "key:" << MD5_ATTR << " value:" << MD5_CHECKSUM;
+ std::string log_check = ss2.str();
+
+ REQUIRE(LogTestController::getInstance().contains(log_check));
+
+ ss2.str("");
+ ss2 << "key:" << SHA1_ATTR << " value:" << SHA1_CHECKSUM;
+ log_check = ss2.str();
+
+ REQUIRE(LogTestController::getInstance().contains(log_check));
+
+ ss2.str("");
+ ss2 << "key:" << SHA256_ATTR << " value:" << SHA256_CHECKSUM;
+ log_check = ss2.str();
+
+ REQUIRE(LogTestController::getInstance().contains(log_check));
+}
+
+#endif // OPENSSL_SUPPORT