You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/01/02 18:48:49 UTC

nifi-minifi-cpp git commit: MINIFICPP-344 Added initial implementation of TFConvertImageToTensor

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 5977aa27c -> a2d9bfca8


MINIFICPP-344 Added initial implementation of TFConvertImageToTensor

This closes #224.

Signed-off-by: Marc Parisi <ph...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a2d9bfca
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a2d9bfca
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a2d9bfca

Branch: refs/heads/master
Commit: a2d9bfca852008bc7127a9b0a5fd181d122fbb9a
Parents: 5977aa2
Author: Andy I. Christianson <an...@andyic.org>
Authored: Tue Dec 19 11:29:29 2017 -0500
Committer: Marc Parisi <ph...@apache.org>
Committed: Tue Jan 2 13:47:58 2018 -0500

----------------------------------------------------------------------
 extensions/tensorflow/CMakeLists.txt            |   4 -
 .../tensorflow/TFConvertImageToTensor.cpp       | 252 +++++++++++++++++++
 extensions/tensorflow/TFConvertImageToTensor.h  | 109 ++++++++
 .../test/tensorflow-tests/TensorFlowTests.cpp   | 134 ++++++++++
 4 files changed, 495 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a2d9bfca/extensions/tensorflow/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/tensorflow/CMakeLists.txt b/extensions/tensorflow/CMakeLists.txt
index 461feee..a8ba8fb 100644
--- a/extensions/tensorflow/CMakeLists.txt
+++ b/extensions/tensorflow/CMakeLists.txt
@@ -52,10 +52,6 @@ target_link_libraries(minifi-tensorflow-extensions ${TENSORFLOW_LIBRARIES})
 find_package(ZLIB REQUIRED)
 include_directories(${ZLIB_INCLUDE_DIRS})
 target_link_libraries (minifi-tensorflow-extensions ${ZLIB_LIBRARIES})
-find_package(Boost COMPONENTS system filesystem REQUIRED)
-include_directories(${Boost_INCLUDE_DIRS})
-target_link_libraries(minifi-tensorflow-extensions ${Boost_SYSTEM_LIBRARY})
-target_link_libraries(minifi-tensorflow-extensions ${Boost_FILESYSTEM_LIBRARY})
 
 if (WIN32)
     set_target_properties(minifi-tensorflow-extensions PROPERTIES

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a2d9bfca/extensions/tensorflow/TFConvertImageToTensor.cpp
----------------------------------------------------------------------
diff --git a/extensions/tensorflow/TFConvertImageToTensor.cpp b/extensions/tensorflow/TFConvertImageToTensor.cpp
new file mode 100644
index 0000000..be5e7a1
--- /dev/null
+++ b/extensions/tensorflow/TFConvertImageToTensor.cpp
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TFConvertImageToTensor.h"
+
+#include "tensorflow/cc/ops/standard_ops.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property TFConvertImageToTensor::ImageFormat(  // NOLINT
+    "Input Format",
+    "The node of the TensorFlow graph to feed tensor inputs to (PNG or RAW). RAW is RGB24.", "");
+core::Property TFConvertImageToTensor::InputWidth(  // NOLINT
+    "Input Width",
+    "The width, in pixels, of the input image.", "");
+core::Property TFConvertImageToTensor::InputHeight(  // NOLINT
+    "Input Height",
+    "The height, in pixels, of the input image.", "");
+core::Property TFConvertImageToTensor::OutputWidth(  // NOLINT
+    "Output Width",
+    "The width, in pixels, of the output image.", "");
+core::Property TFConvertImageToTensor::OutputHeight(  // NOLINT
+    "Output Height",
+    "The height, in pixels, of the output image.", "");
+core::Property TFConvertImageToTensor::NumChannels(  // NOLINT
+    "Channels",
+    "The number of channels (e.g. 3 for RGB, 4 for RGBA) in the input image", "3");
+
+core::Relationship TFConvertImageToTensor::Success(  // NOLINT
+    "success",
+    "Successful graph application outputs");
+core::Relationship TFConvertImageToTensor::Failure(  // NOLINT
+    "failure",
+    "Failures which will not work if retried");
+
+void TFConvertImageToTensor::initialize() {
+  std::set<core::Property> properties;
+  properties.insert(ImageFormat);
+  properties.insert(InputWidth);
+  properties.insert(InputHeight);
+  properties.insert(OutputWidth);
+  properties.insert(OutputHeight);
+  properties.insert(NumChannels);
+  setSupportedProperties(std::move(properties));
+
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(std::move(relationships));
+}
+
+void TFConvertImageToTensor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+  context->getProperty(ImageFormat.getName(), input_format_);
+
+  if (input_format_.empty()) {
+    logger_->log_error("Invalid image format");
+  }
+
+  std::string val;
+
+  if (context->getProperty(InputWidth.getName(), val)) {
+    core::Property::StringToInt(val, input_width_);
+  } else {
+    logger_->log_error("Invalid Input Width");
+  }
+
+  if (context->getProperty(InputHeight.getName(), val)) {
+    core::Property::StringToInt(val, input_height_);
+  } else {
+    logger_->log_error("Invalid Input Height");
+  }
+
+  if (context->getProperty(OutputWidth.getName(), val)) {
+    core::Property::StringToInt(val, output_width_);
+  } else {
+    logger_->log_error("Invalid Output Width");
+  }
+
+  if (context->getProperty(OutputHeight.getName(), val)) {
+    core::Property::StringToInt(val, output_height_);
+  } else {
+    logger_->log_error("Invalid output height");
+  }
+
+  if (context->getProperty(NumChannels.getName(), val)) {
+    core::Property::StringToInt(val, num_channels_);
+  } else {
+    logger_->log_error("Invalid channel count");
+  }
+}
+
+void TFConvertImageToTensor::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
+                                       const std::shared_ptr<core::ProcessSession> &session) {
+  auto flow_file = session->get();
+
+  if (!flow_file) {
+    return;
+  }
+
+  try {
+    // Use an existing context, if one is available
+    std::shared_ptr<TFContext> ctx;
+
+    if (tf_context_q_.try_dequeue(ctx)) {
+      logger_->log_debug("Using available TensorFlow context");
+    }
+
+    std::string input_tensor_name = "input";
+    std::string output_tensor_name = "output";
+
+    if (!ctx) {
+      logger_->log_info("Creating new TensorFlow context");
+      tensorflow::SessionOptions options;
+      ctx = std::make_shared<TFContext>();
+      ctx->tf_session.reset(tensorflow::NewSession(options));
+
+      auto root = tensorflow::Scope::NewRootScope();
+      auto input = tensorflow::ops::Placeholder(root.WithOpName(input_tensor_name), tensorflow::DT_UINT8);
+
+      // Cast pixel values to floats
+      auto float_caster = tensorflow::ops::Cast(root.WithOpName("float_caster"), input, tensorflow::DT_FLOAT);
+
+      // Expand into batches (of size 1)
+      auto dims_expander = tensorflow::ops::ExpandDims(root, float_caster, 0);
+
+      // Resize tensor to output dimensions
+      auto resize = tensorflow::ops::ResizeBilinear(
+          root, dims_expander,
+          tensorflow::ops::Const(root.WithOpName("resize"), {output_height_, output_width_}));
+
+      // Normalize tensor from 0-255 pixel values to 0.0-1.0 values
+      auto output = tensorflow::ops::Div(root.WithOpName(output_tensor_name),
+                                         tensorflow::ops::Sub(root, resize, {0.0f}),
+                                         {255.0f});
+      tensorflow::GraphDef graph_def;
+      {
+        auto status = root.ToGraphDef(&graph_def);
+
+        if (!status.ok()) {
+          std::string msg = "Failed to create TensorFlow graph: ";
+          msg.append(status.ToString());
+          throw std::runtime_error(msg);
+        }
+      }
+
+      {
+        auto status = ctx->tf_session->Create(graph_def);
+
+        if (!status.ok()) {
+          std::string msg = "Failed to create TensorFlow session: ";
+          msg.append(status.ToString());
+          throw std::runtime_error(msg);
+        }
+      }
+    }
+
+    // Apply graph
+    // Read input tensor from flow file
+    tensorflow::Tensor img_tensor(tensorflow::DT_UINT8, {input_height_, input_width_, num_channels_});
+    ImageReadCallback tensor_cb(&img_tensor);
+    session->read(flow_file, &tensor_cb);
+    std::vector<tensorflow::Tensor> outputs;
+    auto status = ctx->tf_session->Run({{input_tensor_name, img_tensor}}, {output_tensor_name + ":0"}, {}, &outputs);
+
+    if (!status.ok()) {
+      std::string msg = "Failed to apply TensorFlow graph: ";
+      msg.append(status.ToString());
+      throw std::runtime_error(msg);
+    }
+
+    // Create output flow file for each output tensor
+    for (const auto &output : outputs) {
+      auto tensor_proto = std::make_shared<tensorflow::TensorProto>();
+      output.AsProtoTensorContent(tensor_proto.get());
+      logger_->log_info("Writing output tensor flow file");
+      TensorWriteCallback write_cb(tensor_proto);
+      session->write(flow_file, &write_cb);
+      session->transfer(flow_file, Success);
+    }
+
+    // Make context available for use again
+    if (tf_context_q_.size_approx() < getMaxConcurrentTasks()) {
+      logger_->log_debug("Releasing TensorFlow context");
+      tf_context_q_.enqueue(ctx);
+    } else {
+      logger_->log_info("Destroying TensorFlow context because it is no longer needed");
+    }
+  } catch (std::exception &exception) {
+    logger_->log_error("Caught Exception %s", exception.what());
+    session->transfer(flow_file, Failure);
+    this->yield();
+  } catch (...) {
+    logger_->log_error("Caught Exception");
+    session->transfer(flow_file, Failure);
+    this->yield();
+  }
+}
+
+int64_t TFConvertImageToTensor::ImageReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+  if (tensor_->AllocatedBytes() < stream->getSize()) {
+    throw std::runtime_error("Tensor is not big enough to hold FlowFile bytes");
+  }
+
+  auto num_read = stream->readData(tensor_->flat<unsigned char>().data(),
+                                   static_cast<int>(stream->getSize()));
+
+  if (num_read != stream->getSize()) {
+    throw std::runtime_error("TensorReadCallback failed to fully read flow file input stream");
+  }
+
+  return num_read;
+}
+
+int64_t TFConvertImageToTensor::TensorWriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+  auto tensor_proto_buf = tensor_proto_->SerializeAsString();
+  auto num_wrote = stream->writeData(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
+                                     static_cast<int>(tensor_proto_buf.size()));
+
+  if (num_wrote != tensor_proto_buf.size()) {
+    std::string msg = "TensorWriteCallback failed to fully write flow file output stream; Expected ";
+    msg.append(std::to_string(tensor_proto_buf.size()));
+    msg.append(" and wrote ");
+    msg.append(std::to_string(num_wrote));
+    throw std::runtime_error(msg);
+  }
+
+  return num_wrote;
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a2d9bfca/extensions/tensorflow/TFConvertImageToTensor.h
----------------------------------------------------------------------
diff --git a/extensions/tensorflow/TFConvertImageToTensor.h b/extensions/tensorflow/TFConvertImageToTensor.h
new file mode 100644
index 0000000..63f889c
--- /dev/null
+++ b/extensions/tensorflow/TFConvertImageToTensor.h
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_TFCONVERTIMAGETOTENSOR_H
+#define NIFI_MINIFI_CPP_TFCONVERTIMAGETOTENSOR_H
+
+#include <atomic>
+
+#include <core/Resource.h>
+#include <core/Processor.h>
+#include <tensorflow/core/public/session.h>
+#include <concurrentqueue.h>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class TFConvertImageToTensor : public core::Processor {
+ public:
+  explicit TFConvertImageToTensor(const std::string &name, uuid_t uuid = nullptr)
+      : Processor(name, uuid),
+        logger_(logging::LoggerFactory<TFConvertImageToTensor>::getLogger()) {
+  }
+
+  static core::Property ImageFormat;
+  static core::Property NumChannels;
+  static core::Property InputWidth;
+  static core::Property InputHeight;
+  static core::Property OutputWidth;
+  static core::Property OutputHeight;
+
+  static core::Relationship Success;
+  static core::Relationship Failure;
+
+  void initialize() override;
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
+  void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override {
+    logger_->log_error("onTrigger invocation with raw pointers is not implemented");
+  }
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context,
+                 const std::shared_ptr<core::ProcessSession> &session) override;
+
+  struct TFContext {
+    std::shared_ptr<tensorflow::Session> tf_session;
+  };
+
+  class ImageReadCallback : public InputStreamCallback {
+   public:
+    explicit ImageReadCallback(tensorflow::Tensor *tensor)
+        : tensor_(tensor) {
+    }
+    ~ImageReadCallback() override = default;
+    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+
+   private:
+    tensorflow::Tensor *tensor_;
+  };
+
+  class TensorWriteCallback : public OutputStreamCallback {
+   public:
+    explicit TensorWriteCallback(std::shared_ptr<tensorflow::TensorProto> tensor_proto)
+        : tensor_proto_(std::move(tensor_proto)) {
+    }
+    ~TensorWriteCallback() override = default;
+    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+
+   private:
+    std::shared_ptr<tensorflow::TensorProto> tensor_proto_;
+  };
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+
+  std::string input_format_;
+  int input_width_;
+  int input_height_;
+  int output_width_;
+  int output_height_;
+  int num_channels_;
+
+  std::shared_ptr<tensorflow::GraphDef> graph_def_;
+  moodycamel::ConcurrentQueue<std::shared_ptr<TFContext>> tf_context_q_;
+};
+
+REGISTER_RESOURCE(TFConvertImageToTensor); // NOLINT
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_TFCONVERTIMAGETOTENSOR_H

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a2d9bfca/libminifi/test/tensorflow-tests/TensorFlowTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/tensorflow-tests/TensorFlowTests.cpp b/libminifi/test/tensorflow-tests/TensorFlowTests.cpp
index 4478105..e9499de 100644
--- a/libminifi/test/tensorflow-tests/TensorFlowTests.cpp
+++ b/libminifi/test/tensorflow-tests/TensorFlowTests.cpp
@@ -21,7 +21,12 @@
 
 #include <tensorflow/cc/framework/scope.h>
 #include <tensorflow/cc/ops/standard_ops.h>
+#include <processors/PutFile.h>
+#include <processors/GetFile.h>
+#include <processors/LogAttribute.h>
+#include <TFConvertImageToTensor.h>
 #include "TFApplyGraph.h"
+#include "TFConvertImageToTensor.h"
 
 #define CATCH_CONFIG_MAIN
 
@@ -165,3 +170,132 @@ TEST_CASE("TensorFlow: Apply Graph", "[executescriptTensorFlowApplyGraph]") { //
     REQUIRE(tensor_val == 4.0f);
   }
 }
+
+TEST_CASE("TensorFlow: ConvertImageToTensor", "[executescriptTensorFlowConvertImageToTensor]") { // NOLINT
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::TFConvertImageToTensor>();
+  LogTestController::getInstance().setTrace<processors::PutFile>();
+  LogTestController::getInstance().setTrace<processors::GetFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+
+  auto plan = testController.createPlan();
+  auto repo = std::make_shared<TestRepository>();
+
+  // Define directory for input protocol buffers
+  std::string in_dir("/tmp/gt.XXXXXX");
+  REQUIRE(testController.createTempDirectory(&in_dir[0]) != nullptr);
+
+  // Define input tensor protocol buffer file
+  std::string in_img_file(in_dir);
+  in_img_file.append("/img");
+
+  // Define directory for output protocol buffers
+  std::string out_dir("/tmp/gt.XXXXXX");
+  REQUIRE(testController.createTempDirectory(&out_dir[0]) != nullptr);
+
+  // Define output tensor protocol buffer file
+  std::string out_tensor_file(out_dir);
+  out_tensor_file.append("/img");
+
+  // Build MiNiFi processing graph
+  auto get_file = plan->addProcessor(
+      "GetFile",
+      "Get Proto");
+  plan->setProperty(
+      get_file,
+      processors::GetFile::Directory.getName(), in_dir);
+  plan->setProperty(
+      get_file,
+      processors::GetFile::KeepSourceFile.getName(),
+      "false");
+  plan->addProcessor(
+      "LogAttribute",
+      "Log Pre Graph Apply",
+      core::Relationship("success", "description"),
+      true);
+  auto tf_apply = plan->addProcessor(
+      "TFConvertImageToTensor",
+      "Convert Image",
+      core::Relationship("success", "description"),
+      true);
+  plan->addProcessor(
+      "LogAttribute",
+      "Log Post Graph Apply",
+      core::Relationship("success", "description"),
+      true);
+  plan->setProperty(
+      tf_apply,
+      processors::TFConvertImageToTensor::ImageFormat.getName(),
+      "RAW");
+  plan->setProperty(
+      tf_apply,
+      processors::TFConvertImageToTensor::InputWidth.getName(),
+      "2");
+  plan->setProperty(
+      tf_apply,
+      processors::TFConvertImageToTensor::InputHeight.getName(),
+      "2");
+  plan->setProperty(
+      tf_apply,
+      processors::TFConvertImageToTensor::OutputWidth.getName(),
+      "10");
+  plan->setProperty(
+      tf_apply,
+      processors::TFConvertImageToTensor::OutputHeight.getName(),
+      "10");
+  plan->setProperty(
+      tf_apply,
+      processors::TFConvertImageToTensor::NumChannels.getName(),
+      "1");
+  auto put_file = plan->addProcessor(
+      "PutFile",
+      "Put Output Tensor",
+      core::Relationship("success", "description"),
+      true);
+  plan->setProperty(
+      put_file,
+      processors::PutFile::Directory.getName(),
+      out_dir);
+  plan->setProperty(
+      put_file,
+      processors::PutFile::ConflictResolution.getName(),
+      processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE);
+
+  // Write test input image
+  {
+    // 2x2 single-channel 8 bit per channel
+    const uint8_t in_img_raw[2*2] = {0, 0,
+                                     0, 0};
+
+    std::ofstream in_file_stream(in_img_file);
+    in_file_stream << in_img_raw;
+  }
+
+  plan->reset();
+  plan->runNextProcessor();  // GetFile
+  plan->runNextProcessor();  // Log
+  plan->runNextProcessor();  // TFConvertImageToTensor
+  plan->runNextProcessor();  // Log
+  plan->runNextProcessor();  // PutFile
+
+  // Read test output tensor
+  {
+    std::ifstream out_file_stream(out_tensor_file);
+    tensorflow::TensorProto tensor_proto;
+    tensor_proto.ParseFromIstream(&out_file_stream);
+    tensorflow::Tensor tensor;
+    tensor.FromProto(tensor_proto);
+
+    // Verify output tensor
+    auto shape = tensor.shape();
+    auto shapeString = shape.DebugString();
+
+    // Ensure output tensor is of the expected shape
+    REQUIRE(shape.IsSameSize({1,     // Batch size
+                              10,    // Width
+                              10,    // Height
+                              1}));  // Channels
+  }
+}