You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/08/07 13:57:52 UTC

nifi-minifi-cpp git commit: MINIFICPP-590: Fix zero length files at startup MINIFICPP-589: Institute regex filtering for interfaces MINIFICPP-589: Limit Pcap to 4.9.4+

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 82557dec8 -> 2f43e2bd3


MINIFICPP-590: Fix zero length files at startup
MINIFICPP-589: Institute regex filtering for interfaces
MINIFICPP-589: Limit Pcap to 4.9.4+

This closes #388.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/2f43e2bd
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/2f43e2bd
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/2f43e2bd

Branch: refs/heads/master
Commit: 2f43e2bd3e6a655fff32b53e793da3d831fc2291
Parents: 82557de
Author: Marc Parisi <ph...@apache.org>
Authored: Sat Aug 4 14:26:54 2018 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Aug 7 09:57:19 2018 -0400

----------------------------------------------------------------------
 extensions/pcap/CapturePacket.cpp       | 41 +++++++++++++++++++++++++---
 extensions/pcap/CapturePacket.h         |  9 ++++--
 libminifi/include/core/ProcessContext.h |  2 +-
 libminifi/include/core/Property.h       |  1 -
 libminifi/test/pcap-tests/PcapTest.cpp  |  3 ++
 linux.sh                                | 10 +++++++
 6 files changed, 58 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2f43e2bd/extensions/pcap/CapturePacket.cpp
----------------------------------------------------------------------
diff --git a/extensions/pcap/CapturePacket.cpp b/extensions/pcap/CapturePacket.cpp
index ba88280..f5e7127 100644
--- a/extensions/pcap/CapturePacket.cpp
+++ b/extensions/pcap/CapturePacket.cpp
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-#include <regex.h>
+#include <regex>
 #include <uuid/uuid.h>
 #include <memory>
 #include <algorithm>
@@ -54,6 +54,7 @@ namespace processors {
 std::shared_ptr<utils::IdGenerator> CapturePacket::id_generator_ = utils::IdGenerator::getIdGenerator();
 core::Property CapturePacket::BaseDir("Base Directory", "Scratch directory for PCAP files", "/tmp/");
 core::Property CapturePacket::BatchSize("Batch Size", "The number of packets to combine within a given PCAP", "50");
+core::Property CapturePacket::NetworkController("Network Controller", "Regular expression of the network controller(s) to which we will attach", ".*");
 core::Property CapturePacket::CaptureBluetooth("Capture Bluetooth", "True indicates that we support bluetooth interfaces", "false");
 
 const char *CapturePacket::ProcessorName = "CapturePacket";
@@ -84,9 +85,11 @@ void CapturePacket::packet_callback(pcpp::RawPacket* packet, pcpp::PcapLiveDevic
 
         capture->writer_->close();
 
+        auto new_capture = create_new_capture(capture->getBasePath(), capture->getMaxSize());
+
         capture_mechanism->sink.enqueue(capture);
 
-        capture_mechanism->source.enqueue(create_new_capture(capture->getBasePath(), capture->getMaxSize()));
+        capture_mechanism->source.enqueue(new_capture);
       } else {
         capture_mechanism->source.enqueue(capture);
       }
@@ -112,6 +115,7 @@ void CapturePacket::initialize() {
   // Set the supported properties
   std::set<core::Property> properties;
   properties.insert(BatchSize);
+  properties.insert(NetworkController);
   properties.insert(BaseDir);
   properties.insert(CaptureBluetooth);
   setSupportedProperties(properties);
@@ -126,6 +130,7 @@ void CapturePacket::onSchedule(const std::shared_ptr<core::ProcessContext> &cont
   if (context->getProperty(BatchSize.getName(), value)) {
     core::Property::StringToInt(value, pcap_batch_size_);
   }
+
   value = "";
   if (context->getProperty(BaseDir.getName(), value)) {
     base_dir_ = value;
@@ -135,6 +140,13 @@ void CapturePacket::onSchedule(const std::shared_ptr<core::ProcessContext> &cont
   if (context->getProperty(CaptureBluetooth.getName(), value)) {
     utils::StringUtils::StringToBool(value, capture_bluetooth_);
   }
+
+  core::Property attached_controllers("Network Controllers", "List of network controllers to attach to -- each may be a regex", ".*");
+
+  getProperty(attached_controllers.getName(), attached_controllers);
+
+  std::vector<std::string> allowed_interfaces = attached_controllers.getValues();
+
   if (IsNullOrEmpty(base_dir_)) {
     base_dir_ = "/tmp/";
   }
@@ -151,6 +163,26 @@ void CapturePacket::onSchedule(const std::shared_ptr<core::ProcessContext> &cont
   for (auto iter : devList) {
     const std::string name = iter->getName();
 
+    if (!allowed_interfaces.empty()) {
+      bool found_match = false;
+      std::string matching_regex = "";
+      for (const auto &filter : allowed_interfaces) {
+        std::regex r(filter);
+        std::smatch m;
+        if (std::regex_match(name, m, r)) {
+          matching_regex = filter;
+          found_match = true;
+          break;
+        }
+      }
+      if (!found_match) {
+        logger_->log_debug("Skipping %s because it does not match any regex", name);
+        continue;
+      } else {
+        logger_->log_trace("Accepting %s because it matches %s", name, matching_regex);
+      }
+    }
+
     if (!iter->open()) {
       logger_->log_error("Could not open device %s", name);
       continue;
@@ -189,9 +221,10 @@ CapturePacket::~CapturePacket() {
 void CapturePacket::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   CapturePacketMechanism *capture;
   if (mover->sink.try_dequeue(capture)) {
-    logger_->log_debug("Received packet capture in file %s", capture->getFile());
+    logger_->log_debug("Received packet capture in file %s %d", capture->getFile(), capture->getSize());
     auto ff = session->create();
-    session->import(capture->getFile(), ff, false, 0);
+    session->import(capture->getFile(), ff, true, 0);
+    logger_->log_debug("Received packet capture in file %s %d for %s", capture->getFile(), capture->getSize(), ff->getResourceClaim()->getContentFullPath());
     session->transfer(ff, Success);
     delete capture;
   } else {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2f43e2bd/extensions/pcap/CapturePacket.h
----------------------------------------------------------------------
diff --git a/extensions/pcap/CapturePacket.h b/extensions/pcap/CapturePacket.h
index 08b71b4..3084eed 100644
--- a/extensions/pcap/CapturePacket.h
+++ b/extensions/pcap/CapturePacket.h
@@ -56,7 +56,7 @@ class CapturePacketMechanism {
   }
 
   bool inline incrementAndCheck() {
-    return atomic_count_++ >= *max_size_;
+    return ++atomic_count_ >= *max_size_;
   }
 
   int64_t *getMaxSize() {
@@ -72,6 +72,10 @@ class CapturePacketMechanism {
   const std::string &getFile() {
     return file_;
   }
+
+  long getSize() const{
+    return atomic_count_;
+  }
  protected:
   CapturePacketMechanism &operator=(const CapturePacketMechanism &other) = delete;
   std::string path_;
@@ -98,7 +102,6 @@ class CapturePacket : public core::Processor {
         capture_bluetooth_(false),
         pcap_batch_size_(50),
         logger_(logging::LoggerFactory<CapturePacket>::getLogger()) {
-    num_ = 0;
     mover = std::unique_ptr<PacketMovers>(new PacketMovers());
   }
   // Destructor
@@ -106,6 +109,7 @@ class CapturePacket : public core::Processor {
   // Processor Name
   static const char *ProcessorName;
   static core::Property BatchSize;
+  static core::Property NetworkController;
   static core::Property BaseDir;
   static core::Property CaptureBluetooth;
   // Supported Relationships
@@ -151,6 +155,7 @@ class CapturePacket : public core::Processor {
   }
   bool capture_bluetooth_;
   std::string base_dir_;
+  std::vector<std::string> attached_controllers_;
   std::string base_path_;
   int64_t pcap_batch_size_;
   std::unique_ptr<PacketMovers> mover;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2f43e2bd/libminifi/include/core/ProcessContext.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index e7f70b7..e1c42e4 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -80,7 +80,7 @@ class ProcessContext : public controller::ControllerServiceLookup {
   // Sets the property value using the property's string name
   bool setProperty(const std::string &name, std::string value) {
     return processor_node_->setProperty(name, value);
-  } // Sets the dynamic property value using the property's string name
+  }  // Sets the dynamic property value using the property's string name
   bool setDynamicProperty(const std::string &name, std::string value) {
     return processor_node_->setDynamicProperty(name, value);
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2f43e2bd/libminifi/include/core/Property.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h
index 50f5896..a59a53d 100644
--- a/libminifi/include/core/Property.h
+++ b/libminifi/include/core/Property.h
@@ -90,7 +90,6 @@ class Property {
         description_(""),
         is_required_(false),
         is_collection_(false) {
-
   }
 
   virtual ~Property() = default;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2f43e2bd/libminifi/test/pcap-tests/PcapTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/pcap-tests/PcapTest.cpp b/libminifi/test/pcap-tests/PcapTest.cpp
index 7a68295..3c6fd27 100644
--- a/libminifi/test/pcap-tests/PcapTest.cpp
+++ b/libminifi/test/pcap-tests/PcapTest.cpp
@@ -56,6 +56,7 @@ class PcapTestHarness : public IntegrationBase {
     LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
     LogTestController::getInstance().setDebug<minifi::core::ProcessGroup>();
     LogTestController::getInstance().setDebug<minifi::core::Processor>();
+    LogTestController::getInstance().setDebug<minifi::core::ConfigurableComponent>();
     LogTestController::getInstance().setDebug<minifi::ThreadedSchedulingAgent>();
   }
 
@@ -67,6 +68,7 @@ class PcapTestHarness : public IntegrationBase {
     assert(LogTestController::getInstance().contains("Starting capture") == true);
     assert(LogTestController::getInstance().contains("Stopping capture") == true);
     assert(LogTestController::getInstance().contains("Stopped device capture. clearing queues") == true);
+    assert(LogTestController::getInstance().contains("Accepting ") == true && LogTestController::getInstance().contains("because it matches .*") );
   }
 
   void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
@@ -77,6 +79,7 @@ class PcapTestHarness : public IntegrationBase {
     assert(inv != nullptr);
 
     configuration->set(minifi::processors::CapturePacket::BaseDir.getName(), dir);
+    configuration->set(minifi::processors::CapturePacket::NetworkController.getName(), ".*");
     configuration->set("nifi.c2.enable", "false");
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2f43e2bd/linux.sh
----------------------------------------------------------------------
diff --git a/linux.sh b/linux.sh
index e279c84..54730ba 100644
--- a/linux.sh
+++ b/linux.sh
@@ -42,6 +42,16 @@ verify_gcc_enable(){
     else
       echo "false"
     fi
+   elif [ "$feature" = "PCAP_ENABLED" ]; then
+    if (( COMPILER_MAJOR >= 4 )); then
+      if (( COMPILER_MAJOR > 4 || COMPILER_MINOR >= 9 )); then
+        echo "true"
+      else
+        echo "false"
+      fi
+    else
+      echo "false"
+    fi
   else
     echo "true"
   fi