You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/11/29 12:03:16 UTC

nifi-minifi-cpp git commit: MINIFICPP-681 - Add content hash processor

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 1cb750866 -> b53f497f3


MINIFICPP-681 - Add content hash processor

This closes #445.

Signed-off-by: Marc Parisi <ph...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/b53f497f
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/b53f497f
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/b53f497f

Branch: refs/heads/master
Commit: b53f497f329dce6e02a519ff7a6d3f1ab20bf853
Parents: 1cb7508
Author: Arpad Boda <ab...@hortonworks.com>
Authored: Mon Nov 19 13:49:39 2018 +0100
Committer: Marc Parisi <ph...@apache.org>
Committed: Thu Nov 29 07:03:00 2018 -0500

----------------------------------------------------------------------
 PROCESSORS.md                              |  26 ++++
 libminifi/include/processors/HashContent.h | 196 ++++++++++++++++++++++++
 libminifi/src/processors/HashContent.cpp   | 100 ++++++++++++
 libminifi/test/unit/HashContentTest.cpp    | 129 ++++++++++++++++
 4 files changed, 451 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b53f497f/PROCESSORS.md
----------------------------------------------------------------------
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 49bfd30..f07a74a 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -33,6 +33,7 @@
 - [GenerateFlowFile](#generateflowfile)
 - [GetFile](#getfile)
 - [GetUSBCamera](#getusbcamera)
+- [HashContent](#hashcontent)
 - [InvokeHTTP](#invokehttp)
 - [ListenHTTP](#listenhttp)
 - [ListenSyslog](#listensyslog)
@@ -130,6 +131,31 @@ default values, and whether a property supports the NiFi Expression Language.
 | success | All FlowFiles are routed to this relationship. |
 
 
+## HashContent
+
+### Description
+
+HashContent calculates the checksum of the content of the flowfile and adds it as an attribute.
+Configuration options exist to select hashing algorithm and set the name of the attribute. 
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other
+properties (not in bold) are considered optional. The table also indicates any
+default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+| Hash Attribute | Checksum | | Name of the attribute the processor will use to add the checksum |
+| Hash Algorithm | MD5 | MD5, SHA1, SHA256 | Name of the algorithm used to calculate the checksum |
+
+### Relationships
+
+| Name | Description |
+| - | - |
+| success | All FlowFiles are routed to this relationship. |
+
+
 ## ConvertHeartBeat
 
 This Processor converts MQTT heartbeats into a JSON repreesntation.  

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b53f497f/libminifi/include/processors/HashContent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/HashContent.h b/libminifi/include/processors/HashContent.h
new file mode 100644
index 0000000..acafd0f
--- /dev/null
+++ b/libminifi/include/processors/HashContent.h
@@ -0,0 +1,196 @@
+/**
+ * @file HashContent.h
+ * HashContent class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef NIFI_MINIFI_CPP_HashContent_H
+#define NIFI_MINIFI_CPP_HashContent_H
+
+#ifdef OPENSSL_SUPPORT
+
+#include <iomanip>
+#include <map>
+#include <memory>
+#include <string>
+#include <sstream>
+#include <utility>
+#include <stdint.h>
+
+#include <openssl/md5.h>
+#include <openssl/sha.h>
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "io/BaseStream.h"
+
+using HashReturnType = std::pair<std::string, int64_t>;
+
+namespace {
+#define HASH_BUFFER_SIZE 16384
+
+  std::string digestToString(const unsigned char * const digest, size_t size) {
+    std::stringstream ss;
+    for(int i = 0; i < size; i++)
+    {
+      ss << std::uppercase << std::hex << std::setw(2) << std::setfill('0') << (int)digest[i];
+    }
+    return ss.str();
+  }
+
+  HashReturnType MD5Hash(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
+    HashReturnType ret_val;
+    ret_val.second = 0;
+    uint8_t buffer[HASH_BUFFER_SIZE];
+    MD5_CTX context;
+    MD5_Init(&context);
+
+    size_t ret = 0;
+    do {
+      ret = stream->readData(buffer, HASH_BUFFER_SIZE);
+      if(ret > 0) {
+        MD5_Update(&context, buffer, ret);
+        ret_val.second += ret;
+      }
+    } while(ret > 0);
+
+    if (ret_val.second > 0) {
+      unsigned char digest[MD5_DIGEST_LENGTH];
+      MD5_Final(digest, &context);
+      ret_val.first = digestToString(digest, MD5_DIGEST_LENGTH);
+    }
+    return ret_val;
+  }
+
+  HashReturnType SHA1Hash(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
+    HashReturnType ret_val;
+    ret_val.second = 0;
+    uint8_t buffer[HASH_BUFFER_SIZE];
+    SHA_CTX context;
+    SHA1_Init(&context);
+
+    size_t ret = 0;
+    do {
+      ret = stream->readData(buffer, HASH_BUFFER_SIZE);
+      if(ret > 0) {
+        SHA1_Update(&context, buffer, ret);
+        ret_val.second += ret;
+      }
+    } while(ret > 0);
+
+    if (ret_val.second > 0) {
+      unsigned char digest[SHA_DIGEST_LENGTH];
+      SHA1_Final(digest, &context);
+      ret_val.first = digestToString(digest, SHA_DIGEST_LENGTH);
+    }
+    return ret_val;
+  }
+
+  HashReturnType SHA256Hash(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
+    HashReturnType ret_val;
+    ret_val.second = 0;
+    uint8_t buffer[HASH_BUFFER_SIZE];
+    SHA256_CTX context;
+    SHA256_Init(&context);
+
+    size_t ret ;
+    do {
+      ret = stream->readData(buffer, HASH_BUFFER_SIZE);
+      if(ret > 0) {
+        SHA256_Update(&context, buffer, ret);
+        ret_val.second += ret;
+      }
+    } while(ret > 0);
+
+    if (ret_val.second > 0) {
+      unsigned char digest[SHA256_DIGEST_LENGTH];
+      SHA256_Final(digest, &context);
+      ret_val.first = digestToString(digest, SHA256_DIGEST_LENGTH);
+    }
+    return ret_val;
+  }
+}
+
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+static const std::map<std::string, const std::function<HashReturnType(const std::shared_ptr<io::BaseStream>&)>> HashAlgos =
+  { {"MD5",  MD5Hash}, {"SHA1", SHA1Hash}, {"SHA256", SHA256Hash} };
+
+//! HashContent Class
+class HashContent : public core::Processor {
+ public:
+  //! Constructor
+  /*!
+  * Create a new processor
+  */
+  explicit HashContent(std::string name,  utils::Identifier uuid = utils::Identifier())
+  : Processor(name, uuid)
+  {
+    logger_ = logging::LoggerFactory<HashContent>::getLogger();
+  }
+  //! Processor Name
+  static constexpr char const* ProcessorName = "HashContent";
+  //! Supported Properties
+  static core::Property HashAttribute;
+  static core::Property HashAlgorithm;
+  //! Supported Relationships
+  static core::Relationship Success;
+
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
+
+  //! OnTrigger method, implemented by NiFi HashContent
+  void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
+  //! Initialize, over write by NiFi HashContent
+  void initialize(void) override;
+
+  class ReadCallback : public InputStreamCallback {
+   public:
+    ReadCallback(std::shared_ptr<core::FlowFile> flowFile, const HashContent& parent);
+    ~ReadCallback() {}
+    int64_t process(std::shared_ptr<io::BaseStream> stream);
+
+   private:
+    std::shared_ptr<core::FlowFile> flowFile_;
+    const HashContent& parent_;
+   };
+
+ protected:
+
+ private:
+  //! Logger
+  std::shared_ptr<logging::Logger> logger_;
+  std::string algoName_;
+  std::string attrKey_;
+};
+
+REGISTER_RESOURCE(HashContent);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //OPENSSL_SUPPORT
+
+#endif //NIFI_MINIFI_CPP_HashContent_H

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b53f497f/libminifi/src/processors/HashContent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/HashContent.cpp b/libminifi/src/processors/HashContent.cpp
new file mode 100644
index 0000000..e76a368
--- /dev/null
+++ b/libminifi/src/processors/HashContent.cpp
@@ -0,0 +1,100 @@
+/**
+ * @file HashContent.cpp
+ * HashContent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef OPENSSL_SUPPORT
+
+#include <algorithm>
+#include <iostream>
+#include <memory>
+#include <string>
+#include "processors/HashContent.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/FlowFile.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property HashContent::HashAttribute("Hash Attribute", "Attribute to store checksum to", "Checksum");
+core::Property HashContent::HashAlgorithm("Hash Algorithm", "Name of the algorithm used to generate checksum", "MD5");
+core::Relationship HashContent::Success("success", "success operational on the flow record");
+
+void HashContent::initialize() {
+  //! Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(HashAttribute);
+  properties.insert(HashAlgorithm);
+  setSupportedProperties(properties);
+  //! Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+}
+
+void HashContent::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+
+  attrKey_ = (context->getProperty(HashAttribute.getName(), value)) ? value : "Checksum";
+  algoName_ = (context->getProperty(HashAlgorithm.getName(), value)) ? value : "MD5";
+
+  std::transform(algoName_.begin(), algoName_.end(), algoName_.begin(), ::toupper);
+
+  // Erase '-' to make sha-256 and sha-1 work, too
+  algoName_.erase(std::remove(algoName_.begin(), algoName_.end(), '-'), algoName_.end());
+}
+
+void HashContent::onTrigger(core::ProcessContext *, core::ProcessSession *session) {
+  std::shared_ptr<core::FlowFile> flowFile = session->get();
+
+  if (!flowFile) {
+    return;
+  }
+
+  ReadCallback cb(flowFile, *this);
+  session->read(flowFile, &cb);
+  session->transfer(flowFile, Success);
+}
+
+int64_t HashContent::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+  // This throws in case algo is not found, but that's fine
+  auto algo = HashAlgos.at(parent_.algoName_);
+
+  const auto& ret_val = algo(stream);
+
+  flowFile_->setAttribute(parent_.attrKey_, ret_val.first);
+
+  return ret_val.second;
+}
+
+HashContent::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, const HashContent& parent)
+  : flowFile_(flowFile),
+    parent_(parent)
+  {}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif  // OPENSSL_SUPPORT

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b53f497f/libminifi/test/unit/HashContentTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/HashContentTest.cpp b/libminifi/test/unit/HashContentTest.cpp
new file mode 100644
index 0000000..e7fba41
--- /dev/null
+++ b/libminifi/test/unit/HashContentTest.cpp
@@ -0,0 +1,129 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef OPENSSL_SUPPORT
+
+#include <uuid/uuid.h>
+#include <fstream>
+#include <map>
+#include <memory>
+#include <utility>
+#include <string>
+#include <set>
+#include <iostream>
+
+#include "../TestBase.h"
+#include "core/Core.h"
+
+#include "core/FlowFile.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+
+#include "processors/GetFile.h"
+#include "processors/HashContent.h"
+#include "processors/LogAttribute.h"
+
+const char* TEST_TEXT = "Test text";
+const char* TEST_FILE = "test_file.txt";
+
+const char* MD5_ATTR = "MD5Attr";
+const char* SHA1_ATTR = "SHA1Attr";
+const char* SHA256_ATTR = "SHA256Attr";
+
+const char* MD5_CHECKSUM = "4FE8A693C64F93F65C5FAF42DC49AB23";
+const char* SHA1_CHECKSUM = "03840DEB949D6CF0C0A624FA7EBA87FBDBCB7783";
+const char* SHA256_CHECKSUM = "66D5B2CC06203137F8A0E9714638DC1085C57A3F1FA26C8823AE5CF89AB26488";
+
+
+TEST_CASE("Test Creation of HashContent", "[HashContentCreate]") {
+  TestController testController;
+  std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::HashContent>("processorname");
+  REQUIRE(processor->getName() == "processorname");
+  utils::Identifier processoruuid;
+  REQUIRE(processor->getUUID(processoruuid));
+}
+
+TEST_CASE("Test usage of ExtractText", "[extracttextTest]") {
+  TestController testController;
+  LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+  char dir[] = "/tmp/gt.XXXXXX";
+
+  REQUIRE(testController.createTempDirectory(dir) != nullptr);
+
+  std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
+  plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+  plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::KeepSourceFile.getName(), "true");
+
+  std::shared_ptr<core::Processor> md5processor = plan->addProcessor("HashContent", "HashContentMD5",
+                                                                    core::Relationship("success", "description"), true);
+  plan->setProperty(md5processor, org::apache::nifi::minifi::processors::HashContent::HashAttribute.getName(), MD5_ATTR);
+  plan->setProperty(md5processor, org::apache::nifi::minifi::processors::HashContent::HashAlgorithm.getName(), "MD5");
+
+  std::shared_ptr<core::Processor> shaprocessor = plan->addProcessor("HashContent", "HashContentSHA1",
+                                                                    core::Relationship("success", "description"), true);
+  plan->setProperty(shaprocessor, org::apache::nifi::minifi::processors::HashContent::HashAttribute.getName(), SHA1_ATTR);
+  plan->setProperty(shaprocessor, org::apache::nifi::minifi::processors::HashContent::HashAlgorithm.getName(), "sha1");
+
+  std::shared_ptr<core::Processor> sha2processor = plan->addProcessor("HashContent", "HashContentSHA256",
+                                                                    core::Relationship("success", "description"), true);
+  plan->setProperty(sha2processor, org::apache::nifi::minifi::processors::HashContent::HashAttribute.getName(), SHA256_ATTR);
+  plan->setProperty(sha2processor, org::apache::nifi::minifi::processors::HashContent::HashAlgorithm.getName(), "sha-256");
+
+  std::shared_ptr<core::Processor> laprocessor = plan->addProcessor("LogAttribute", "outputLogAttribute",
+                                                                    core::Relationship("success", "description"), true);
+
+  std::stringstream ss1;
+  ss1 << dir << "/" << TEST_FILE;
+  std::string test_file_path = ss1.str();
+
+  std::ofstream test_file(test_file_path);
+  if (test_file.is_open()) {
+    test_file << TEST_TEXT << std::endl;
+    test_file.close();
+  }
+
+  for (int i = 0; i < 5; ++i) {
+    plan->runNextProcessor();
+  }
+
+  std::stringstream ss2;
+  ss2 << "key:" << MD5_ATTR << " value:" << MD5_CHECKSUM;
+  std::string log_check = ss2.str();
+
+  REQUIRE(LogTestController::getInstance().contains(log_check));
+
+  ss2.str("");
+  ss2 << "key:" << SHA1_ATTR << " value:" << SHA1_CHECKSUM;
+  log_check = ss2.str();
+
+  REQUIRE(LogTestController::getInstance().contains(log_check));
+
+  ss2.str("");
+  ss2 << "key:" << SHA256_ATTR << " value:" << SHA256_CHECKSUM;
+  log_check = ss2.str();
+
+  REQUIRE(LogTestController::getInstance().contains(log_check));
+}
+
+#endif  // OPENSSL_SUPPORT