You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/09/16 21:59:46 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-989 - Motion detect for a set of captured frames

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 72f92fa  MINIFICPP-989 - Motion detect for a set of captured frames
72f92fa is described below

commit 72f92fa8b08dcf09bf6bfdec6c5c30feb1e4fe9f
Author: Nghia Le <mi...@gmail.com>
AuthorDate: Wed Aug 14 14:42:01 2019 +0200

    MINIFICPP-989 - Motion detect for a set of captured frames
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #627
---
 extensions/opencv/FrameIO.h                      |  77 +++++++++
 extensions/opencv/MotionDetector.cpp             | 203 +++++++++++++++++++++++
 extensions/opencv/MotionDetector.h               |  88 ++++++++++
 extensions/opencv/tests/CaptureRTSPFrameTest.cpp |   5 +-
 4 files changed, 371 insertions(+), 2 deletions(-)

diff --git a/extensions/opencv/FrameIO.h b/extensions/opencv/FrameIO.h
new file mode 100644
index 0000000..8009afa
--- /dev/null
+++ b/extensions/opencv/FrameIO.h
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_FRAMEIO_H
+#define NIFI_MINIFI_CPP_FRAMEIO_H
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace opencv {
+
+class FrameWriteCallback : public OutputStreamCallback {
+  public:
+    explicit FrameWriteCallback(cv::Mat image_mat, std::string image_encoding_)
+    // TODO - Nghia: Check std::move(img_mat).
+        : image_mat_(std::move(image_mat)), image_encoding_(image_encoding_) {
+    }
+    ~FrameWriteCallback() override = default;
+
+    int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+      int64_t ret = 0;
+      imencode(image_encoding_, image_mat_, image_buf_);
+      ret = stream->write(image_buf_.data(), image_buf_.size());
+      return ret;
+    }
+
+  private:
+    std::vector<uchar> image_buf_;
+    cv::Mat image_mat_;
+    std::string image_encoding_;
+};
+
+class FrameReadCallback : public InputStreamCallback {
+  public:
+    explicit FrameReadCallback(cv::Mat &image_mat)
+        : image_mat_(image_mat) {
+    }
+    ~FrameReadCallback() override = default;
+
+    int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+      int64_t ret = 0;
+      image_buf_.resize(stream->getSize());
+      ret = stream->read(image_buf_.data(), static_cast<int>(stream->getSize()));
+      if (ret != stream->getSize()) {
+        throw std::runtime_error("ImageReadCallback failed to fully read flow file input stream");
+      }
+      image_mat_ = cv::imdecode(image_buf_, -1);
+      return ret;
+    }
+
+  private:
+    std::vector<uchar> image_buf_;
+    cv::Mat &image_mat_;
+};
+
+} /* namespace opencv */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_FRAMEIO_H
diff --git a/extensions/opencv/MotionDetector.cpp b/extensions/opencv/MotionDetector.cpp
new file mode 100644
index 0000000..5234a35
--- /dev/null
+++ b/extensions/opencv/MotionDetector.cpp
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MotionDetector.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property MotionDetector::ImageEncoding(
+    core::PropertyBuilder::createProperty("Image Encoding")
+        ->withDescription("The encoding that should be applied to the output")
+        ->isRequired(true)
+        ->withAllowableValues<std::string>({".jpg", ".png"})
+        ->withDefaultValue(".jpg")->build());
+core::Property MotionDetector::MinInterestArea(
+    core::PropertyBuilder::createProperty("Minimum Area")
+        ->withDescription("We only consider the movement regions with area greater than this.")
+        ->isRequired(true)
+        ->withDefaultValue<uint32_t>(100)->build());
+core::Property MotionDetector::Threshold(
+    core::PropertyBuilder::createProperty("Threshold for segmentation")
+        ->withDescription("Pixel greater than this will be white, otherwise black.")
+        ->isRequired(true)
+        ->withDefaultValue<uint32_t>(42)->build());
+core::Property MotionDetector::BackgroundFrame(
+    core::PropertyBuilder::createProperty("Path to background frame")
+        ->withDescription("If not provided then the processor will take the first input frame as background")
+        ->isRequired(true)
+        ->build());
+core::Property MotionDetector::DilateIter(
+    core::PropertyBuilder::createProperty("Dilate iteration")
+        ->withDescription("For image processing, if an object is detected as 2 separate objects, increase this value")
+        ->isRequired(true)
+        ->withDefaultValue<uint32_t>(10)->build());
+
+core::Relationship MotionDetector::Success("success", "Successful to detect motion");
+core::Relationship MotionDetector::Failure("failure", "Failure to detect motion");
+
+void MotionDetector::initialize() {
+  std::set<core::Property> properties;
+  properties.insert(ImageEncoding);
+  properties.insert(MinInterestArea);
+  properties.insert(Threshold);
+  properties.insert(BackgroundFrame);
+  properties.insert(DilateIter);
+  setSupportedProperties(std::move(properties));
+
+  setSupportedRelationships({Success, Failure});
+}
+
+void MotionDetector::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
+                                  const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  std::string value;
+
+  if (context->getProperty(ImageEncoding.getName(), value)) {
+    image_encoding_ = value;
+  }
+
+  if (context->getProperty(MinInterestArea.getName(), value)) {
+    core::Property::StringToInt(value, min_area_);
+  }
+
+  if (context->getProperty(Threshold.getName(), value)) {
+    core::Property::StringToInt(value, threshold_);
+  }
+
+  if (context->getProperty(DilateIter.getName(), value)) {
+    core::Property::StringToInt(value, dil_iter_);
+  }
+
+  if (context->getProperty(BackgroundFrame.getName(), value) && !value.empty()) {
+    bg_img_ = cv::imread(value, cv::IMREAD_GRAYSCALE);
+    double scale = IMG_WIDTH / bg_img_.size().width;
+    cv::resize(bg_img_, bg_img_, cv::Size(0, 0), scale, scale);
+    cv::GaussianBlur(bg_img_, bg_img_, cv::Size(21, 21), 0, 0);
+    bg_img_.convertTo(background_, CV_32F);
+  }
+
+  logger_->log_trace("MotionDetector processor scheduled");
+}
+
+bool MotionDetector::detectAndDraw(cv::Mat &frame) {
+  cv::Mat gray;
+  cv::Mat img_diff, thresh;
+  std::vector<cv::Mat> contours;
+
+  logger_->log_trace("Detect and Draw");
+
+  cv::cvtColor(frame, gray, cv::COLOR_BGR2GRAY);
+  cv::GaussianBlur(gray, gray, cv::Size(21, 21), 0, 0);
+
+  // Get difference between current frame and background
+  logger_->log_trace("Get difference [%d x %d] [%d x %d]", bg_img_.rows, bg_img_.cols, gray.rows, gray.cols);
+  cv::absdiff(gray, bg_img_, img_diff);
+  logger_->log_trace("Apply threshold");
+  cv::threshold(img_diff, thresh, threshold_, 255, cv::THRESH_BINARY);
+  // Image processing.
+  logger_->log_trace("Dilation");
+  cv::dilate(thresh, thresh, cv::Mat(), cv::Point(-1, -1), dil_iter_);
+  cv::findContours(thresh, contours, cv::RETR_EXTERNAL, cv::CHAIN_APPROX_SIMPLE);
+
+  // Finish process
+  logger_->log_debug("Draw contours");
+  bool moved = false;
+  for (const auto &contour : contours) {
+    auto area = cv::contourArea(contour);
+    if (area < min_area_) {
+      continue;
+    }
+    moved = true;
+    cv::Rect bbox = cv::boundingRect(contour);
+    cv::rectangle(frame, bbox.tl(), bbox.br(), cv::Scalar(0, 255, 0), 2, 8, 0);
+  }
+  logger_->log_trace("Updating background");
+  if (!moved) {
+    logger_->log_debug("Not moved");
+    // Adaptive background, update background so that the illumnation does not affect that much.
+    cv::accumulateWeighted(gray, background_, 0.5);
+    cv::convertScaleAbs(background_, bg_img_);
+  }
+  logger_->log_trace("Finish Detect and Draw");
+  return moved;
+}
+
+void MotionDetector::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
+                                 const std::shared_ptr<core::ProcessSession> &session) {
+  std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
+  if (!lock.owns_lock()) {
+    logger_->log_info("Cannot process due to an unfinished onTrigger");
+    context->yield();
+    return;
+  }
+
+  auto flow_file = session->get();
+  if (flow_file->getSize() == 0) {
+    logger_->log_info("Empty flow file");
+    return;
+  }
+  cv::Mat frame;
+
+  opencv::FrameReadCallback cb(frame);
+  session->read(flow_file, &cb);
+
+  if (frame.empty()) {
+    logger_->log_error("Empty frame.");
+    session->transfer(flow_file, Failure);
+  }
+
+  double scale = IMG_WIDTH / frame.size().width;
+  cv::resize(frame, frame, cv::Size(0, 0), scale, scale);
+
+  if (background_.empty()) {
+    logger_->log_info("Background is missing, update and yield.");
+    cv::cvtColor(frame, bg_img_, cv::COLOR_BGR2GRAY);
+    cv::GaussianBlur(bg_img_, bg_img_, cv::Size(21, 21), 0, 0);
+    bg_img_.convertTo(background_, CV_32F);
+    return;
+  }
+  logger_->log_trace("Start motion detecting");
+
+  auto t = std::time(nullptr);
+  auto tm = *std::localtime(&t);
+  std::ostringstream oss;
+  oss << std::put_time(&tm, "%Y-%m-%d %H-%M-%S");
+  auto filename = oss.str();
+  filename.append(image_encoding_);
+
+  detectAndDraw(frame);
+
+  opencv::FrameWriteCallback write_cb(frame, image_encoding_);
+
+  session->putAttribute(flow_file, "filename", filename);
+
+  session->write(flow_file, &write_cb);
+  session->transfer(flow_file, Success);
+  logger_->log_trace("Finish motion detecting");
+}
+
+void MotionDetector::notifyStop() {
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/opencv/MotionDetector.h b/extensions/opencv/MotionDetector.h
new file mode 100644
index 0000000..4acafb8
--- /dev/null
+++ b/extensions/opencv/MotionDetector.h
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MOTIONDETECTOR_H
+#define NIFI_MINIFI_CPP_MOTIONDETECTOR_H
+
+#include <atomic>
+
+#include <core/Resource.h>
+#include <core/Processor.h>
+#include <opencv2/opencv.hpp>
+#include <opencv2/objdetect.hpp>
+#include <opencv2/imgproc.hpp>
+#include "FrameIO.h"
+
+#include <iomanip>
+#include <ctime>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class MotionDetector : public core::Processor {
+
+ public:
+
+  explicit MotionDetector(const std::string &name, utils::Identifier uuid = utils::Identifier())
+      : Processor(name, uuid),
+        logger_(logging::LoggerFactory<MotionDetector>::getLogger()) {
+  }
+
+  static core::Property ImageEncoding;
+  static core::Property MinInterestArea;
+  static core::Property Threshold;
+  static core::Property DilateIter;
+  static core::Property BackgroundFrame;
+
+  static core::Relationship Success;
+  static core::Relationship Failure;
+
+  virtual void initialize(void) override;
+  virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+
+  void notifyStop() override;
+
+ private:
+
+  bool detectAndDraw(cv::Mat &frame);
+
+  std::shared_ptr<logging::Logger> logger_;
+  std::mutex mutex_;
+  cv::Mat background_;
+  cv::Mat bg_img_;
+  std::string image_encoding_;
+  int min_area_;
+  int threshold_;
+  int dil_iter_;
+
+  // hardcoded width to 500
+  const double IMG_WIDTH = 500.0;
+};
+
+REGISTER_RESOURCE(MotionDetector, "Detect motion from captured images."); // NOLINT
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_MOTIONDETECTOR_H
diff --git a/extensions/opencv/tests/CaptureRTSPFrameTest.cpp b/extensions/opencv/tests/CaptureRTSPFrameTest.cpp
index fac070b..9993da3 100644
--- a/extensions/opencv/tests/CaptureRTSPFrameTest.cpp
+++ b/extensions/opencv/tests/CaptureRTSPFrameTest.cpp
@@ -16,13 +16,15 @@
  * limitations under the License.
  */
 
+#include "CaptureRTSPFrame.h"
+
+#include <uuid/uuid.h>
 #include <map>
 #include <memory>
 #include <fstream>
 #include <utility>
 #include <string>
 #include <set>
-#include <uuid/uuid.h>
 #include <iostream>
 
 #include "FlowFile.h"
@@ -30,7 +32,6 @@
 #include "../../../libminifi/test/TestBase.h"
 #include "FlowController.h"
 #include "core/Processor.h"
-#include "CaptureRTSPFrame.h"
 #include "core/ProcessorNode.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"