You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/01/02 18:48:49 UTC
nifi-minifi-cpp git commit: MINIFICPP-344 Added initial
implementation of TFConvertImageToTensor
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 5977aa27c -> a2d9bfca8
MINIFICPP-344 Added initial implementation of TFConvertImageToTensor
This closes #224.
Signed-off-by: Marc Parisi <ph...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a2d9bfca
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a2d9bfca
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a2d9bfca
Branch: refs/heads/master
Commit: a2d9bfca852008bc7127a9b0a5fd181d122fbb9a
Parents: 5977aa2
Author: Andy I. Christianson <an...@andyic.org>
Authored: Tue Dec 19 11:29:29 2017 -0500
Committer: Marc Parisi <ph...@apache.org>
Committed: Tue Jan 2 13:47:58 2018 -0500
----------------------------------------------------------------------
extensions/tensorflow/CMakeLists.txt | 4 -
.../tensorflow/TFConvertImageToTensor.cpp | 252 +++++++++++++++++++
extensions/tensorflow/TFConvertImageToTensor.h | 109 ++++++++
.../test/tensorflow-tests/TensorFlowTests.cpp | 134 ++++++++++
4 files changed, 495 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a2d9bfca/extensions/tensorflow/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/tensorflow/CMakeLists.txt b/extensions/tensorflow/CMakeLists.txt
index 461feee..a8ba8fb 100644
--- a/extensions/tensorflow/CMakeLists.txt
+++ b/extensions/tensorflow/CMakeLists.txt
@@ -52,10 +52,6 @@ target_link_libraries(minifi-tensorflow-extensions ${TENSORFLOW_LIBRARIES})
find_package(ZLIB REQUIRED)
include_directories(${ZLIB_INCLUDE_DIRS})
target_link_libraries (minifi-tensorflow-extensions ${ZLIB_LIBRARIES})
-find_package(Boost COMPONENTS system filesystem REQUIRED)
-include_directories(${Boost_INCLUDE_DIRS})
-target_link_libraries(minifi-tensorflow-extensions ${Boost_SYSTEM_LIBRARY})
-target_link_libraries(minifi-tensorflow-extensions ${Boost_FILESYSTEM_LIBRARY})
if (WIN32)
set_target_properties(minifi-tensorflow-extensions PROPERTIES
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a2d9bfca/extensions/tensorflow/TFConvertImageToTensor.cpp
----------------------------------------------------------------------
diff --git a/extensions/tensorflow/TFConvertImageToTensor.cpp b/extensions/tensorflow/TFConvertImageToTensor.cpp
new file mode 100644
index 0000000..be5e7a1
--- /dev/null
+++ b/extensions/tensorflow/TFConvertImageToTensor.cpp
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TFConvertImageToTensor.h"
+
+#include "tensorflow/cc/ops/standard_ops.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property TFConvertImageToTensor::ImageFormat( // NOLINT
+ "Input Format",
+ "The node of the TensorFlow graph to feed tensor inputs to (PNG or RAW). RAW is RGB24.", "");
+core::Property TFConvertImageToTensor::InputWidth( // NOLINT
+ "Input Width",
+ "The width, in pixels, of the input image.", "");
+core::Property TFConvertImageToTensor::InputHeight( // NOLINT
+ "Input Height",
+ "The height, in pixels, of the input image.", "");
+core::Property TFConvertImageToTensor::OutputWidth( // NOLINT
+ "Output Width",
+ "The width, in pixels, of the output image.", "");
+core::Property TFConvertImageToTensor::OutputHeight( // NOLINT
+ "Output Height",
+ "The height, in pixels, of the output image.", "");
+core::Property TFConvertImageToTensor::NumChannels( // NOLINT
+ "Channels",
+ "The number of channels (e.g. 3 for RGB, 4 for RGBA) in the input image", "3");
+
+core::Relationship TFConvertImageToTensor::Success( // NOLINT
+ "success",
+ "Successful graph application outputs");
+core::Relationship TFConvertImageToTensor::Failure( // NOLINT
+ "failure",
+ "Failures which will not work if retried");
+
+void TFConvertImageToTensor::initialize() {
+ std::set<core::Property> properties;
+ properties.insert(ImageFormat);
+ properties.insert(InputWidth);
+ properties.insert(InputHeight);
+ properties.insert(OutputWidth);
+ properties.insert(OutputHeight);
+ properties.insert(NumChannels);
+ setSupportedProperties(std::move(properties));
+
+ std::set<core::Relationship> relationships;
+ relationships.insert(Success);
+ relationships.insert(Failure);
+ setSupportedRelationships(std::move(relationships));
+}
+
+void TFConvertImageToTensor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+ context->getProperty(ImageFormat.getName(), input_format_);
+
+ if (input_format_.empty()) {
+ logger_->log_error("Invalid image format");
+ }
+
+ std::string val;
+
+ if (context->getProperty(InputWidth.getName(), val)) {
+ core::Property::StringToInt(val, input_width_);
+ } else {
+ logger_->log_error("Invalid Input Width");
+ }
+
+ if (context->getProperty(InputHeight.getName(), val)) {
+ core::Property::StringToInt(val, input_height_);
+ } else {
+ logger_->log_error("Invalid Input Height");
+ }
+
+ if (context->getProperty(OutputWidth.getName(), val)) {
+ core::Property::StringToInt(val, output_width_);
+ } else {
+ logger_->log_error("Invalid Output Width");
+ }
+
+ if (context->getProperty(OutputHeight.getName(), val)) {
+ core::Property::StringToInt(val, output_height_);
+ } else {
+ logger_->log_error("Invalid output height");
+ }
+
+ if (context->getProperty(NumChannels.getName(), val)) {
+ core::Property::StringToInt(val, num_channels_);
+ } else {
+ logger_->log_error("Invalid channel count");
+ }
+}
+
+void TFConvertImageToTensor::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
+ const std::shared_ptr<core::ProcessSession> &session) {
+ auto flow_file = session->get();
+
+ if (!flow_file) {
+ return;
+ }
+
+ try {
+ // Use an existing context, if one is available
+ std::shared_ptr<TFContext> ctx;
+
+ if (tf_context_q_.try_dequeue(ctx)) {
+ logger_->log_debug("Using available TensorFlow context");
+ }
+
+ std::string input_tensor_name = "input";
+ std::string output_tensor_name = "output";
+
+ if (!ctx) {
+ logger_->log_info("Creating new TensorFlow context");
+ tensorflow::SessionOptions options;
+ ctx = std::make_shared<TFContext>();
+ ctx->tf_session.reset(tensorflow::NewSession(options));
+
+ auto root = tensorflow::Scope::NewRootScope();
+ auto input = tensorflow::ops::Placeholder(root.WithOpName(input_tensor_name), tensorflow::DT_UINT8);
+
+ // Cast pixel values to floats
+ auto float_caster = tensorflow::ops::Cast(root.WithOpName("float_caster"), input, tensorflow::DT_FLOAT);
+
+ // Expand into batches (of size 1)
+ auto dims_expander = tensorflow::ops::ExpandDims(root, float_caster, 0);
+
+ // Resize tensor to output dimensions
+ auto resize = tensorflow::ops::ResizeBilinear(
+ root, dims_expander,
+ tensorflow::ops::Const(root.WithOpName("resize"), {output_height_, output_width_}));
+
+ // Normalize tensor from 0-255 pixel values to 0.0-1.0 values
+ auto output = tensorflow::ops::Div(root.WithOpName(output_tensor_name),
+ tensorflow::ops::Sub(root, resize, {0.0f}),
+ {255.0f});
+ tensorflow::GraphDef graph_def;
+ {
+ auto status = root.ToGraphDef(&graph_def);
+
+ if (!status.ok()) {
+ std::string msg = "Failed to create TensorFlow graph: ";
+ msg.append(status.ToString());
+ throw std::runtime_error(msg);
+ }
+ }
+
+ {
+ auto status = ctx->tf_session->Create(graph_def);
+
+ if (!status.ok()) {
+ std::string msg = "Failed to create TensorFlow session: ";
+ msg.append(status.ToString());
+ throw std::runtime_error(msg);
+ }
+ }
+ }
+
+ // Apply graph
+ // Read input tensor from flow file
+ tensorflow::Tensor img_tensor(tensorflow::DT_UINT8, {input_height_, input_width_, num_channels_});
+ ImageReadCallback tensor_cb(&img_tensor);
+ session->read(flow_file, &tensor_cb);
+ std::vector<tensorflow::Tensor> outputs;
+ auto status = ctx->tf_session->Run({{input_tensor_name, img_tensor}}, {output_tensor_name + ":0"}, {}, &outputs);
+
+ if (!status.ok()) {
+ std::string msg = "Failed to apply TensorFlow graph: ";
+ msg.append(status.ToString());
+ throw std::runtime_error(msg);
+ }
+
+ // Create output flow file for each output tensor
+ for (const auto &output : outputs) {
+ auto tensor_proto = std::make_shared<tensorflow::TensorProto>();
+ output.AsProtoTensorContent(tensor_proto.get());
+ logger_->log_info("Writing output tensor flow file");
+ TensorWriteCallback write_cb(tensor_proto);
+ session->write(flow_file, &write_cb);
+ session->transfer(flow_file, Success);
+ }
+
+ // Make context available for use again
+ if (tf_context_q_.size_approx() < getMaxConcurrentTasks()) {
+ logger_->log_debug("Releasing TensorFlow context");
+ tf_context_q_.enqueue(ctx);
+ } else {
+ logger_->log_info("Destroying TensorFlow context because it is no longer needed");
+ }
+ } catch (std::exception &exception) {
+ logger_->log_error("Caught Exception %s", exception.what());
+ session->transfer(flow_file, Failure);
+ this->yield();
+ } catch (...) {
+ logger_->log_error("Caught Exception");
+ session->transfer(flow_file, Failure);
+ this->yield();
+ }
+}
+
+int64_t TFConvertImageToTensor::ImageReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+ if (tensor_->AllocatedBytes() < stream->getSize()) {
+ throw std::runtime_error("Tensor is not big enough to hold FlowFile bytes");
+ }
+
+ auto num_read = stream->readData(tensor_->flat<unsigned char>().data(),
+ static_cast<int>(stream->getSize()));
+
+ if (num_read != stream->getSize()) {
+ throw std::runtime_error("TensorReadCallback failed to fully read flow file input stream");
+ }
+
+ return num_read;
+}
+
+int64_t TFConvertImageToTensor::TensorWriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+ auto tensor_proto_buf = tensor_proto_->SerializeAsString();
+ auto num_wrote = stream->writeData(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
+ static_cast<int>(tensor_proto_buf.size()));
+
+ if (num_wrote != tensor_proto_buf.size()) {
+ std::string msg = "TensorWriteCallback failed to fully write flow file output stream; Expected ";
+ msg.append(std::to_string(tensor_proto_buf.size()));
+ msg.append(" and wrote ");
+ msg.append(std::to_string(num_wrote));
+ throw std::runtime_error(msg);
+ }
+
+ return num_wrote;
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a2d9bfca/extensions/tensorflow/TFConvertImageToTensor.h
----------------------------------------------------------------------
diff --git a/extensions/tensorflow/TFConvertImageToTensor.h b/extensions/tensorflow/TFConvertImageToTensor.h
new file mode 100644
index 0000000..63f889c
--- /dev/null
+++ b/extensions/tensorflow/TFConvertImageToTensor.h
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_TFCONVERTIMAGETOTENSOR_H
+#define NIFI_MINIFI_CPP_TFCONVERTIMAGETOTENSOR_H
+
+#include <atomic>
+
+#include <core/Resource.h>
+#include <core/Processor.h>
+#include <tensorflow/core/public/session.h>
+#include <concurrentqueue.h>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class TFConvertImageToTensor : public core::Processor {
+ public:
+ explicit TFConvertImageToTensor(const std::string &name, uuid_t uuid = nullptr)
+ : Processor(name, uuid),
+ logger_(logging::LoggerFactory<TFConvertImageToTensor>::getLogger()) {
+ }
+
+ static core::Property ImageFormat;
+ static core::Property NumChannels;
+ static core::Property InputWidth;
+ static core::Property InputHeight;
+ static core::Property OutputWidth;
+ static core::Property OutputHeight;
+
+ static core::Relationship Success;
+ static core::Relationship Failure;
+
+ void initialize() override;
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
+ void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override {
+ logger_->log_error("onTrigger invocation with raw pointers is not implemented");
+ }
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context,
+ const std::shared_ptr<core::ProcessSession> &session) override;
+
+ struct TFContext {
+ std::shared_ptr<tensorflow::Session> tf_session;
+ };
+
+ class ImageReadCallback : public InputStreamCallback {
+ public:
+ explicit ImageReadCallback(tensorflow::Tensor *tensor)
+ : tensor_(tensor) {
+ }
+ ~ImageReadCallback() override = default;
+ int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+
+ private:
+ tensorflow::Tensor *tensor_;
+ };
+
+ class TensorWriteCallback : public OutputStreamCallback {
+ public:
+ explicit TensorWriteCallback(std::shared_ptr<tensorflow::TensorProto> tensor_proto)
+ : tensor_proto_(std::move(tensor_proto)) {
+ }
+ ~TensorWriteCallback() override = default;
+ int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+
+ private:
+ std::shared_ptr<tensorflow::TensorProto> tensor_proto_;
+ };
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+
+ std::string input_format_;
+ int input_width_;
+ int input_height_;
+ int output_width_;
+ int output_height_;
+ int num_channels_;
+
+ std::shared_ptr<tensorflow::GraphDef> graph_def_;
+ moodycamel::ConcurrentQueue<std::shared_ptr<TFContext>> tf_context_q_;
+};
+
+REGISTER_RESOURCE(TFConvertImageToTensor); // NOLINT
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_TFCONVERTIMAGETOTENSOR_H
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a2d9bfca/libminifi/test/tensorflow-tests/TensorFlowTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/tensorflow-tests/TensorFlowTests.cpp b/libminifi/test/tensorflow-tests/TensorFlowTests.cpp
index 4478105..e9499de 100644
--- a/libminifi/test/tensorflow-tests/TensorFlowTests.cpp
+++ b/libminifi/test/tensorflow-tests/TensorFlowTests.cpp
@@ -21,7 +21,12 @@
#include <tensorflow/cc/framework/scope.h>
#include <tensorflow/cc/ops/standard_ops.h>
+#include <processors/PutFile.h>
+#include <processors/GetFile.h>
+#include <processors/LogAttribute.h>
+#include <TFConvertImageToTensor.h>
#include "TFApplyGraph.h"
+#include "TFConvertImageToTensor.h"
#define CATCH_CONFIG_MAIN
@@ -165,3 +170,132 @@ TEST_CASE("TensorFlow: Apply Graph", "[executescriptTensorFlowApplyGraph]") { //
REQUIRE(tensor_val == 4.0f);
}
}
+
+TEST_CASE("TensorFlow: ConvertImageToTensor", "[executescriptTensorFlowConvertImageToTensor]") { // NOLINT
+ TestController testController;
+
+ LogTestController::getInstance().setTrace<TestPlan>();
+ LogTestController::getInstance().setTrace<processors::TFConvertImageToTensor>();
+ LogTestController::getInstance().setTrace<processors::PutFile>();
+ LogTestController::getInstance().setTrace<processors::GetFile>();
+ LogTestController::getInstance().setTrace<processors::LogAttribute>();
+
+ auto plan = testController.createPlan();
+ auto repo = std::make_shared<TestRepository>();
+
+ // Define directory for input protocol buffers
+ std::string in_dir("/tmp/gt.XXXXXX");
+ REQUIRE(testController.createTempDirectory(&in_dir[0]) != nullptr);
+
+ // Define input tensor protocol buffer file
+ std::string in_img_file(in_dir);
+ in_img_file.append("/img");
+
+ // Define directory for output protocol buffers
+ std::string out_dir("/tmp/gt.XXXXXX");
+ REQUIRE(testController.createTempDirectory(&out_dir[0]) != nullptr);
+
+ // Define output tensor protocol buffer file
+ std::string out_tensor_file(out_dir);
+ out_tensor_file.append("/img");
+
+ // Build MiNiFi processing graph
+ auto get_file = plan->addProcessor(
+ "GetFile",
+ "Get Proto");
+ plan->setProperty(
+ get_file,
+ processors::GetFile::Directory.getName(), in_dir);
+ plan->setProperty(
+ get_file,
+ processors::GetFile::KeepSourceFile.getName(),
+ "false");
+ plan->addProcessor(
+ "LogAttribute",
+ "Log Pre Graph Apply",
+ core::Relationship("success", "description"),
+ true);
+ auto tf_apply = plan->addProcessor(
+ "TFConvertImageToTensor",
+ "Convert Image",
+ core::Relationship("success", "description"),
+ true);
+ plan->addProcessor(
+ "LogAttribute",
+ "Log Post Graph Apply",
+ core::Relationship("success", "description"),
+ true);
+ plan->setProperty(
+ tf_apply,
+ processors::TFConvertImageToTensor::ImageFormat.getName(),
+ "RAW");
+ plan->setProperty(
+ tf_apply,
+ processors::TFConvertImageToTensor::InputWidth.getName(),
+ "2");
+ plan->setProperty(
+ tf_apply,
+ processors::TFConvertImageToTensor::InputHeight.getName(),
+ "2");
+ plan->setProperty(
+ tf_apply,
+ processors::TFConvertImageToTensor::OutputWidth.getName(),
+ "10");
+ plan->setProperty(
+ tf_apply,
+ processors::TFConvertImageToTensor::OutputHeight.getName(),
+ "10");
+ plan->setProperty(
+ tf_apply,
+ processors::TFConvertImageToTensor::NumChannels.getName(),
+ "1");
+ auto put_file = plan->addProcessor(
+ "PutFile",
+ "Put Output Tensor",
+ core::Relationship("success", "description"),
+ true);
+ plan->setProperty(
+ put_file,
+ processors::PutFile::Directory.getName(),
+ out_dir);
+ plan->setProperty(
+ put_file,
+ processors::PutFile::ConflictResolution.getName(),
+ processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE);
+
+ // Write test input image
+ {
+ // 2x2 single-channel 8 bit per channel
+ const uint8_t in_img_raw[2*2] = {0, 0,
+ 0, 0};
+
+ std::ofstream in_file_stream(in_img_file);
+ in_file_stream << in_img_raw;
+ }
+
+ plan->reset();
+ plan->runNextProcessor(); // GetFile
+ plan->runNextProcessor(); // Log
+ plan->runNextProcessor(); // TFConvertImageToTensor
+ plan->runNextProcessor(); // Log
+ plan->runNextProcessor(); // PutFile
+
+ // Read test output tensor
+ {
+ std::ifstream out_file_stream(out_tensor_file);
+ tensorflow::TensorProto tensor_proto;
+ tensor_proto.ParseFromIstream(&out_file_stream);
+ tensorflow::Tensor tensor;
+ tensor.FromProto(tensor_proto);
+
+ // Verify output tensor
+ auto shape = tensor.shape();
+ auto shapeString = shape.DebugString();
+
+ // Ensure output tensor is of the expected shape
+ REQUIRE(shape.IsSameSize({1, // Batch size
+ 10, // Width
+ 10, // Height
+ 1})); // Channels
+ }
+}