You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/07/31 22:10:33 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-854: Capture
RTSP Frame
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new a140a72 MINIFICPP-854: Capture RTSP Frame
a140a72 is described below
commit a140a7291fed28f46af3c61edea6e8feb4d044a5
Author: Nghia Le <mi...@gmail.com>
AuthorDate: Tue Jul 16 11:03:18 2019 +0200
MINIFICPP-854: Capture RTSP Frame
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #612
---
CMakeLists.txt | 2 +-
extensions/opencv/CaptureRTSPFrame.cpp | 33 ++++++---
extensions/opencv/CaptureRTSPFrame.h | 2 +
extensions/opencv/OpenCVLoader.cpp | 31 ++++++++
extensions/opencv/OpenCVLoader.h | 85 ++++++++++++++++++++++
.../opencv/tests}/CMakeLists.txt | 23 ++++--
.../opencv/tests}/CaptureRTSPFrameTest.cpp | 69 +++++++++++-------
7 files changed, 199 insertions(+), 46 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d37ebf4..4965605 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -530,7 +530,7 @@ endif()
## OpenCV Extesions
option(ENABLE_OPENCV "Disables the OpenCV extensions." OFF)
if (ENABLE_OPENCV)
- createExtension(OPENCV-EXTENSIONS "OPENCV EXTENSIONS" "This enabled OpenCV support" "extensions/opencv" "${TEST_DIR}/opencv-tests")
+ createExtension(OPENCV-EXTENSIONS "OPENCV EXTENSIONS" "This enabled OpenCV support" "extensions/opencv" "extensions/opencv/tests")
endif()
## Bustache/template extensions
diff --git a/extensions/opencv/CaptureRTSPFrame.cpp b/extensions/opencv/CaptureRTSPFrame.cpp
index 445e3fb..50fac73 100644
--- a/extensions/opencv/CaptureRTSPFrame.cpp
+++ b/extensions/opencv/CaptureRTSPFrame.cpp
@@ -116,17 +116,35 @@ void CaptureRTSPFrame::onSchedule(core::ProcessContext *context, core::ProcessSe
rtspURI.append(rtsp_uri_);
}
- cv::VideoCapture capture(rtspURI.c_str());
- video_capture_ = capture;
- video_backend_driver_ = video_capture_.getBackendName();
+ rtsp_url_ = rtspURI;
+
}
void CaptureRTSPFrame::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::ProcessSession> &session) {
- auto flow_file = session->create();
- cv::Mat frame;
+ std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
+ if (!lock.owns_lock()) {
+ logger_->log_info("Cannot process due to an unfinished onTrigger");
+ context->yield();
+ return;
+ }
+ try {
+ video_capture_.open(rtsp_url_);
+ video_backend_driver_ = video_capture_.getBackendName();
+ } catch (const cv::Exception &e) {
+ logger_->log_error("Unable to open RTSP stream: %s", e.what());
+ context->yield();
+ return;
+ } catch (...) {
+ logger_->log_error("Unable to open RTSP stream: unhandled exception");
+ context->yield();
+ return;
+ }
+
+ auto flow_file = session->create();
+ cv::Mat frame;
// retrieve a frame of your source
if (video_capture_.read(frame)) {
if (!frame.empty()) {
@@ -145,6 +163,7 @@ void CaptureRTSPFrame::onTrigger(const std::shared_ptr<core::ProcessContext> &co
session->write(flow_file, &write_cb);
session->transfer(flow_file, Success);
+ logger_->log_info("A frame is captured");
} else {
logger_->log_error("Empty Mat frame received from capture");
session->transfer(flow_file, Failure);
@@ -154,13 +173,9 @@ void CaptureRTSPFrame::onTrigger(const std::shared_ptr<core::ProcessContext> &co
session->transfer(flow_file, Failure);
}
- frame.release();
-
}
void CaptureRTSPFrame::notifyStop() {
- // Release the Capture reference and free up resources.
- video_capture_.release();
}
} /* namespace processors */
diff --git a/extensions/opencv/CaptureRTSPFrame.h b/extensions/opencv/CaptureRTSPFrame.h
index e3ac740..537d672 100644
--- a/extensions/opencv/CaptureRTSPFrame.h
+++ b/extensions/opencv/CaptureRTSPFrame.h
@@ -84,11 +84,13 @@ class CaptureRTSPFrame : public core::Processor {
private:
std::shared_ptr<logging::Logger> logger_;
+ std::mutex mutex_;
std::string rtsp_username_;
std::string rtsp_password_;
std::string rtsp_host_;
std::string rtsp_port_;
std::string rtsp_uri_;
+ std::string rtsp_url_;
cv::VideoCapture video_capture_;
std::string image_encoding_;
std::string video_backend_driver_;
diff --git a/extensions/opencv/OpenCVLoader.cpp b/extensions/opencv/OpenCVLoader.cpp
new file mode 100644
index 0000000..eb7cfba
--- /dev/null
+++ b/extensions/opencv/OpenCVLoader.cpp
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "OpenCVLoader.h"
+
+#include "core/FlowConfiguration.h"
+
+bool OpenCVObjectFactory::added = core::FlowConfiguration::add_static_func("createOpenCVFactory");
+extern "C" {
+
+
+
+void *createOpenCVFactory(void) {
+ return new OpenCVObjectFactory();
+}
+
+}
diff --git a/extensions/opencv/OpenCVLoader.h b/extensions/opencv/OpenCVLoader.h
new file mode 100644
index 0000000..e5a589a
--- /dev/null
+++ b/extensions/opencv/OpenCVLoader.h
@@ -0,0 +1,85 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_OPENCVLOADER_H_
+#define EXTENSIONS_OPENCVLOADER_H_
+
+#include "CaptureRTSPFrame.h"
+#include "core/ClassLoader.h"
+
+class OpenCVObjectFactoryInitializer : public core::ObjectFactoryInitializer {
+ public:
+ virtual bool initialize() {
+ // By default in OpenCV, ffmpeg capture is hardcoded to use TCP and this is a workaround
+ // also if UDP timeout, ffmpeg will retry with TCP
+ // Note:
+ // 1. OpenCV community are trying to find a better approach than setenv.
+ // 2. The command will not overwrite value if "OPENCV_FFMPEG_CAPTURE_OPTIONS" already exists.
+ return setenv("OPENCV_FFMPEG_CAPTURE_OPTIONS", "rtsp_transport;udp", 0) == 0;
+ }
+
+ virtual void deinitialize() {
+ }
+};
+
+class OpenCVObjectFactory : public core::ObjectFactory {
+ public:
+ OpenCVObjectFactory() {
+
+ }
+
+ /**
+ * Gets the name of the object.
+ * @return class name of processor
+ */
+ virtual std::string getName() override{
+ return "OpenCVObjectFactory";
+ }
+
+ virtual std::string getClassName() override{
+ return "OpenCVObjectFactory";
+ }
+ /**
+ * Gets the class name for the object
+ * @return class name for the processor.
+ */
+ virtual std::vector<std::string> getClassNames() override{
+ std::vector<std::string> class_names;
+ class_names.push_back("CaptureRTSPFrame");
+ return class_names;
+ }
+
+ virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) override{
+ if (utils::StringUtils::equalsIgnoreCase(class_name, "CaptureRTSPFrame")) {
+ return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::CaptureRTSPFrame>());
+ } else {
+ return nullptr;
+ }
+ }
+
+ virtual std::unique_ptr<core::ObjectFactoryInitializer> getInitializer() {
+ return std::unique_ptr<core::ObjectFactoryInitializer>(new OpenCVObjectFactoryInitializer());
+ }
+
+ static bool added;
+
+};
+
+extern "C" {
+ DLL_EXPORT void *createOpenCVFactory(void);
+}
+#endif /* EXTENSIONS_OPENCVLOADER_H_ */
diff --git a/libminifi/test/opencv-tests/CMakeLists.txt b/extensions/opencv/tests/CMakeLists.txt
similarity index 74%
rename from libminifi/test/opencv-tests/CMakeLists.txt
rename to extensions/opencv/tests/CMakeLists.txt
index 3cb4bf7..34693db 100644
--- a/libminifi/test/opencv-tests/CMakeLists.txt
+++ b/extensions/opencv/tests/CMakeLists.txt
@@ -21,19 +21,26 @@ file(GLOB OPENCV_TESTS "*.cpp")
SET(OPENCV_TEST_COUNT 0)
+if (WIN32)
+ set(LINK_FLAGS "/WHOLEARCHIVE")
+ set(LINK_END_FLAGS "")
+elseif (APPLE)
+ set(LINK_FLAGS "-Wl,-all_load")
+ set(LINK_END_FLAGS "")
+else ()
+ set(LINK_FLAGS "-Wl,--whole-archive")
+ set(LINK_END_FLAGS "-Wl,--no-whole-archive")
+endif ()
+
FOREACH(testfile ${OPENCV_TESTS})
get_filename_component(testfilename "${testfile}" NAME_WE)
add_executable("${testfilename}" "${testfile}")
- createTests("${testfilename}")
target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/opencv/")
-
- if (APPLE)
- target_link_libraries (${testfilename} -Wl,-all_load minifi-opencv)
- else ()
- target_link_libraries (${testfilename} -Wl,--whole-archive minifi-opencv -Wl,--no-whole-archive)
- endif ()
+ target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+ target_link_libraries (${testfilename} ${LINK_FLAGS} minifi-opencv minifi-standard-processors ${LINK_END_FLAGS})
+ createTests("${testfilename}")
MATH(EXPR OPENCV_TEST_COUNT "${OPENCV_TEST_COUNT}+1")
add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
ENDFOREACH()
-message("-- Finished building ${OPENCV_TEST_COUNT} OpenCV related test file(s)...")
\ No newline at end of file
+message("-- Finished building ${OPENCV_TEST_COUNT} OpenCV related test file(s)...")
diff --git a/libminifi/test/opencv-tests/CaptureRTSPFrameTest.cpp b/extensions/opencv/tests/CaptureRTSPFrameTest.cpp
similarity index 50%
rename from libminifi/test/opencv-tests/CaptureRTSPFrameTest.cpp
rename to extensions/opencv/tests/CaptureRTSPFrameTest.cpp
index 9d6b6d7..fac070b 100644
--- a/libminifi/test/opencv-tests/CaptureRTSPFrameTest.cpp
+++ b/extensions/opencv/tests/CaptureRTSPFrameTest.cpp
@@ -16,27 +16,28 @@
* limitations under the License.
*/
-#include <uuid/uuid.h>
-#include <fstream>
#include <map>
#include <memory>
+#include <fstream>
#include <utility>
#include <string>
#include <set>
-#include "FlowController.h"
-#include "../TestBase.h"
-#include "core/Core.h"
+#include <uuid/uuid.h>
+#include <iostream>
+
#include "FlowFile.h"
-#include "../unit/ProvenanceTestHelper.h"
+#include "core/Core.h"
+#include "../../../libminifi/test/TestBase.h"
+#include "FlowController.h"
#include "core/Processor.h"
+#include "CaptureRTSPFrame.h"
+#include "core/ProcessorNode.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
-#include "core/ProcessorNode.h"
-#include <iostream>
-#include "CaptureRTSPFrame.h"
+#include "processors/LogAttribute.h"
+#include "../../../libminifi/test/unit/ProvenanceTestHelper.h"
-
-TEST_CASE("CaptureRTSPFrame", "[opencvtest1]") {
+TEST_CASE("CaptureRTSPFrame::ValidCapture", "[opencvtest1]") {
TestController testController;
LogTestController::getInstance().setTrace<minifi::processors::CaptureRTSPFrame>();
@@ -44,29 +45,41 @@ TEST_CASE("CaptureRTSPFrame", "[opencvtest1]") {
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> captureRTSP = plan->addProcessor("CaptureRTSPFrame", "CaptureRTSPFrame");
-
- plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPUsername.getName(), "admin");
- plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPPassword.getName(), "nope");
- plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPHostname.getName(), "192.168.1.200");
- plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPURI.getName(), "");
+ // the RTSP url below comes from a public RTSP stream (hopefully still alive by the time you read this)
+ // alternatively, we can set our own server using vlc.
+ // vlc -vvv --loop <input video> --sout '#rtp{port=1234,sdp=rtsp://127.0.0.1:port/test}' --sout-keep
+ // then the uri will be rtsp://127.0.0.1:port/test
+ plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPHostname.getName(), "170.93.143.139");
+ plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPURI.getName(), "rtplive/470011e600ef003a004ee33696235daa");
plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPPort.getName(), "");
plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::ImageEncoding.getName(), ".jpg");
- testController.runSession(plan, false);
- auto records = plan->getProvenanceRecords();
+ testController.runSession(plan, true);
std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
- REQUIRE(record == nullptr);
- REQUIRE(records.size() == 0);
+ REQUIRE(record);
+ REQUIRE(LogTestController::getInstance().contains("A frame is captured"));
+}
+
+TEST_CASE("CaptureRTSPFrame::InvalidURI", "[opencvtest2]") {
+ TestController testController;
+
+ LogTestController::getInstance().setTrace<minifi::processors::CaptureRTSPFrame>();
+ LogTestController::getInstance().setDebug<core::ProcessSession>();
- plan->reset();
- testController.runSession(plan, false);
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<core::Processor> captureRTSP = plan->addProcessor("CaptureRTSPFrame", "CaptureRTSPFrame");
- records = plan->getProvenanceRecords();
- record = plan->getCurrentFlowFile();
- testController.runSession(plan, false);
+ plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPHostname.getName(), "170.93.143.139");
+ plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPURI.getName(), "abcd");
+ plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::RTSPPort.getName(), "");
+ plan->setProperty(captureRTSP, minifi::processors::CaptureRTSPFrame::ImageEncoding.getName(), ".jpg");
- records = plan->getProvenanceRecords();
- record = plan->getCurrentFlowFile();
+ plan->addProcessor(
+ "LogAttribute",
+ "Log",
+ core::Relationship("failure", "description"),
+ true);
- LogTestController::getInstance().reset();
+ testController.runSession(plan, true);
+ REQUIRE(LogTestController::getInstance().contains("Unable to open RTSP stream"));
}