You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/07/31 22:10:33 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-854: Capture RTSP Frame

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new a140a72  MINIFICPP-854: Capture RTSP Frame
a140a72 is described below

commit a140a7291fed28f46af3c61edea6e8feb4d044a5
Author: Nghia Le <mi...@gmail.com>
AuthorDate: Tue Jul 16 11:03:18 2019 +0200

    MINIFICPP-854: Capture RTSP Frame
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #612
---
 CMakeLists.txt                                     |  2 +-
 extensions/opencv/CaptureRTSPFrame.cpp             | 33 ++++++---
 extensions/opencv/CaptureRTSPFrame.h               |  2 +
 extensions/opencv/OpenCVLoader.cpp                 | 31 ++++++++
 extensions/opencv/OpenCVLoader.h                   | 85 ++++++++++++++++++++++
 .../opencv/tests}/CMakeLists.txt                   | 23 ++++--
 .../opencv/tests}/CaptureRTSPFrameTest.cpp         | 69 +++++++++++-------
 7 files changed, 199 insertions(+), 46 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index d37ebf4..4965605 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -530,7 +530,7 @@ endif()
 ## OpenCV Extesions
 option(ENABLE_OPENCV "Disables the OpenCV extensions." OFF)
 if (ENABLE_OPENCV)
-	createExtension(OPENCV-EXTENSIONS "OPENCV EXTENSIONS" "This enabled OpenCV support" "extensions/opencv" "${TEST_DIR}/opencv-tests")
+	createExtension(OPENCV-EXTENSIONS "OPENCV EXTENSIONS" "This enabled OpenCV support" "extensions/opencv" "extensions/opencv/tests")
 endif()
 
 ## Bustache/template extensions
diff --git a/extensions/opencv/CaptureRTSPFrame.cpp b/extensions/opencv/CaptureRTSPFrame.cpp
index 445e3fb..50fac73 100644
--- a/extensions/opencv/CaptureRTSPFrame.cpp
+++ b/extensions/opencv/CaptureRTSPFrame.cpp
@@ -116,17 +116,35 @@ void CaptureRTSPFrame::onSchedule(core::ProcessContext *context, core::ProcessSe
     rtspURI.append(rtsp_uri_);
   }
 
-  cv::VideoCapture capture(rtspURI.c_str());
-  video_capture_ = capture;
-  video_backend_driver_ = video_capture_.getBackendName();
+  rtsp_url_ = rtspURI;
+
 }
 
 void CaptureRTSPFrame::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
                                  const std::shared_ptr<core::ProcessSession> &session) {
-  auto flow_file = session->create();
 
-  cv::Mat frame;
+  std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
+  if (!lock.owns_lock()) {
+    logger_->log_info("Cannot process due to an unfinished onTrigger");
+    context->yield();
+    return;
+  }
 
+  try {
+    video_capture_.open(rtsp_url_);
+    video_backend_driver_ = video_capture_.getBackendName();
+  } catch (const cv::Exception &e) {
+    logger_->log_error("Unable to open RTSP stream: %s", e.what());
+    context->yield();
+    return;
+  } catch (...) {
+    logger_->log_error("Unable to open RTSP stream: unhandled exception");
+    context->yield();
+    return;
+  }
+
+  auto flow_file = session->create();
+  cv::Mat frame;
   // retrieve a frame of your source
   if (video_capture_.read(frame)) {
     if (!frame.empty()) {
@@ -145,6 +163,7 @@ void CaptureRTSPFrame::onTrigger(const std::shared_ptr<core::ProcessContext> &co
 
       session->write(flow_file, &write_cb);
       session->transfer(flow_file, Success);
+      logger_->log_info("A frame is captured");
     } else {
       logger_->log_error("Empty Mat frame received from capture");
       session->transfer(flow_file, Failure);
@@ -154,13 +173,9 @@ void CaptureRTSPFrame::onTrigger(const std::shared_ptr<core::ProcessContext> &co
     session->transfer(flow_file, Failure);
   }
 
-  frame.release();
-
 }
 
 void CaptureRTSPFrame::notifyStop() {
-  // Release the Capture reference and free up resources.
-  video_capture_.release();
 }
 
 } /* namespace processors */
diff --git a/extensions/opencv/CaptureRTSPFrame.h b/extensions/opencv/CaptureRTSPFrame.h
index e3ac740..537d672 100644
--- a/extensions/opencv/CaptureRTSPFrame.h
+++ b/extensions/opencv/CaptureRTSPFrame.h
@@ -84,11 +84,13 @@ class CaptureRTSPFrame : public core::Processor {
 
  private:
   std::shared_ptr<logging::Logger> logger_;
+  std::mutex mutex_;
   std::string rtsp_username_;
   std::string rtsp_password_;
   std::string rtsp_host_;
   std::string rtsp_port_;
   std::string rtsp_uri_;
+  std::string rtsp_url_;
   cv::VideoCapture video_capture_;
   std::string image_encoding_;
   std::string video_backend_driver_;
diff --git a/extensions/opencv/OpenCVLoader.cpp b/extensions/opencv/OpenCVLoader.cpp
new file mode 100644
index 0000000..eb7cfba
--- /dev/null
+++ b/extensions/opencv/OpenCVLoader.cpp
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "OpenCVLoader.h"
+
+#include "core/FlowConfiguration.h"
+
+bool OpenCVObjectFactory::added = core::FlowConfiguration::add_static_func("createOpenCVFactory");
+extern "C" {
+
+
+
+void *createOpenCVFactory(void) {
+  return new OpenCVObjectFactory();
+}
+
+}
diff --git a/extensions/opencv/OpenCVLoader.h b/extensions/opencv/OpenCVLoader.h
new file mode 100644
index 0000000..e5a589a
--- /dev/null
+++ b/extensions/opencv/OpenCVLoader.h
@@ -0,0 +1,85 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_OPENCVLOADER_H_
+#define EXTENSIONS_OPENCVLOADER_H_
+
+#include "CaptureRTSPFrame.h"
+#include "core/ClassLoader.h"
+
+class OpenCVObjectFactoryInitializer : public core::ObjectFactoryInitializer {
+ public:
+  virtual bool initialize() {
+    // By default in OpenCV, ffmpeg capture is hardcoded to use TCP and this is a workaround
+    // also if UDP timeout, ffmpeg will retry with TCP
+    // Note:
+    // 1. OpenCV community are trying to find a better approach than setenv.
+    // 2. The command will not overwrite value if "OPENCV_FFMPEG_CAPTURE_OPTIONS" already exists.
+    return setenv("OPENCV_FFMPEG_CAPTURE_OPTIONS", "rtsp_transport;udp", 0) == 0;
+  }
+
+  virtual void deinitialize() {
+  }
+};
+
+class OpenCVObjectFactory : public core::ObjectFactory {
+ public:
+  OpenCVObjectFactory() {
+
+  }
+
+  /**
+   * Gets the name of the object.
+   * @return class name of processor
+   */
+  virtual std::string getName() override{
+    return "OpenCVObjectFactory";
+  }
+
+  virtual std::string getClassName() override{
+    return "OpenCVObjectFactory";
+  }
+  /**
+   * Gets the class name for the object
+   * @return class name for the processor.
+   */
+  virtual std::vector<std::string> getClassNames() override{
+    std::vector<std::string> class_names;
+    class_names.push_back("CaptureRTSPFrame");
+    return class_names;
+  }
+
+  virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) override{
+    if (utils::StringUtils::equalsIgnoreCase(class_name, "CaptureRTSPFrame")) {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::CaptureRTSPFrame>());
+    } else {
+      return nullptr;
+    }
+  }
+
+  virtual std::unique_ptr<core::ObjectFactoryInitializer> getInitializer() {
+    return std::unique_ptr<core::ObjectFactoryInitializer>(new OpenCVObjectFactoryInitializer());
+  }
+
+  static bool added;
+
+};
+
+extern "C" {
+	DLL_EXPORT void *createOpenCVFactory(void);
+}
+#endif /* EXTENSIONS_OPENCVLOADER_H_ */
diff --git a/libminifi/test/opencv-tests/CMakeLists.txt b/extensions/opencv/tests/CMakeLists.txt
similarity index 74%
rename from libminifi/test/opencv-tests/CMakeLists.txt
rename to extensions/opencv/tests/CMakeLists.txt
index 3cb4bf7..34693db 100644
--- a/libminifi/test/opencv-tests/CMakeLists.txt
+++ b/extensions/opencv/tests/CMakeLists.txt
@@ -21,19 +21,26 @@ file(GLOB OPENCV_TESTS  "*.cpp")
 
 SET(OPENCV_TEST_COUNT 0)
 
+if (WIN32)
+	set(LINK_FLAGS "/WHOLEARCHIVE")
+	set(LINK_END_FLAGS "")
+elseif (APPLE)
+	set(LINK_FLAGS "-Wl,-all_load")
+	set(LINK_END_FLAGS "")
+else ()
+	set(LINK_FLAGS "-Wl,--whole-archive")
+	set(LINK_END_FLAGS "-Wl,--no-whole-archive")
+endif ()
+
 FOREACH(testfile ${OPENCV_TESTS})
 	get_filename_component(testfilename "${testfile}" NAME_WE)
 	add_executable("${testfilename}" "${testfile}")
-	createTests("${testfilename}")
 	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/opencv/")
-
-	if (APPLE)
-		target_link_libraries (${testfilename} -Wl,-all_load minifi-opencv)
-	else ()
-		target_link_libraries (${testfilename} -Wl,--whole-archive minifi-opencv -Wl,--no-whole-archive)
-	endif ()
+	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+	target_link_libraries (${testfilename} ${LINK_FLAGS} minifi-opencv minifi-standard-processors ${LINK_END_FLAGS})
+	createTests("${testfilename}")
 	MATH(EXPR OPENCV_TEST_COUNT "${OPENCV_TEST_COUNT}+1")
 	add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
 	target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
 ENDFOREACH()
-message("-- Finished building ${OPENCV_TEST_COUNT} OpenCV related test file(s)...")
\ No newline at end of file
+message("-- Finished building ${OPENCV_TEST_COUNT} OpenCV related test file(s)...")
diff --git a/libminifi/test/opencv-tests/CaptureRTSPFrameTest.cpp b/extensions/opencv/tests/CaptureRTSPFrameTest.cpp
similarity index 50%
rename from libminifi/test/opencv-tests/CaptureRTSPFrameTest.cpp
rename to extensions/opencv/tests/CaptureRTSPFrameTest.cpp
index 9d6b6d7..fac070b 100644
--- a/libminifi/test/opencv-tests/CaptureRTSPFrameTest.cpp
+++ b/extensions/opencv/tests/CaptureRTSPFrameTest.cpp
@@ -16,27 +16,28 @@
  * limitations under the License.
  */
 
-#include <uuid/uuid.h>
-#include <fstream>
 #include <map>
 #include <memory>
+#include <fstream>
 #include <utility>
 #include <string>
 #include <set>
-#include "FlowController.h"
-#include "../TestBase.h"
-#include "core/Core.h"
+#include <uuid/uuid.h>
+#include <iostream>
+
 #include "FlowFile.h"
-#include "../unit/ProvenanceTestHelper.h"
+#include "core/Core.h"
+#include "../../../libminifi/test/TestBase.h"
+#include "FlowController.h"
 #include "core/Processor.h"
+#include "CaptureRTSPFrame.h"
+#include "core/ProcessorNode.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
-#include "core/ProcessorNode.h"
-#include <iostream>
-#include "CaptureRTSPFrame.h"
+#include "processors/LogAttribute.h"
+#include "../../../libminifi/test/unit/ProvenanceTestHelper.h"
 
-
-TEST_CASE("CaptureRTSPFrame", "[opencvtest1]") {
+TEST_CASE("CaptureRTSPFrame::ValidCapture", "[opencvtest1]") {
     TestController testController;
 
     LogTestController::getInstance().setTrace<minifi::processors::CaptureRTSPFrame>();
@@ -44,29 +45,41 @@ TEST_CASE("CaptureRTSPFrame", "[opencvtest1]") {
 
     std::shared_ptr<TestPlan> plan = testController.createPlan();
     std::shared_ptr<core::Processor> captureRTSP = plan->addProcessor("CaptureRTSPFrame", "CaptureRTSPFrame");
-
-    plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPUsername.getName(), "admin");
-    plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPPassword.getName(), "nope");
-    plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPHostname.getName(), "192.168.1.200");
-    plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPURI.getName(), "");
+    // the RTSP url below comes from a public RTSP stream (hopefully still alive by the time you read this)
+    // alternatively, we can set our own server using vlc.
+    // vlc -vvv --loop <input video> --sout '#rtp{port=1234,sdp=rtsp://127.0.0.1:port/test}' --sout-keep
+    // then the uri will be rtsp://127.0.0.1:port/test
+    plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPHostname.getName(), "170.93.143.139");
+    plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPURI.getName(), "rtplive/470011e600ef003a004ee33696235daa");
     plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPPort.getName(), "");
     plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::ImageEncoding.getName(), ".jpg");
 
-    testController.runSession(plan, false);
-    auto records = plan->getProvenanceRecords();
+    testController.runSession(plan, true);
     std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
-    REQUIRE(record == nullptr);
-    REQUIRE(records.size() == 0);
+    REQUIRE(record);
+    REQUIRE(LogTestController::getInstance().contains("A frame is captured"));
+}
+
+TEST_CASE("CaptureRTSPFrame::InvalidURI", "[opencvtest2]") {
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<minifi::processors::CaptureRTSPFrame>();
+  LogTestController::getInstance().setDebug<core::ProcessSession>();
 
-    plan->reset();
-    testController.runSession(plan, false);
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> captureRTSP = plan->addProcessor("CaptureRTSPFrame", "CaptureRTSPFrame");
 
-    records = plan->getProvenanceRecords();
-    record = plan->getCurrentFlowFile();
-    testController.runSession(plan, false);
+  plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPHostname.getName(), "170.93.143.139");
+  plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPURI.getName(), "abcd");
+  plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPPort.getName(), "");
+  plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::ImageEncoding.getName(), ".jpg");
 
-    records = plan->getProvenanceRecords();
-    record = plan->getCurrentFlowFile();
+  plan->addProcessor(
+          "LogAttribute",
+          "Log",
+          core::Relationship("failure", "description"),
+          true);
 
-    LogTestController::getInstance().reset();
+  testController.runSession(plan, true);
+  REQUIRE(LogTestController::getInstance().contains("Unable to open RTSP stream"));
 }