You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/02/16 13:30:43 UTC

[GitHub] [nifi-minifi-cpp] adamdebreceni opened a new pull request #1004: MINIFICPP-1450 - Revive SQL processors

adamdebreceni opened a new pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004


   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603886951



##########
File path: .github/workflows/ci.yml
##########
@@ -132,12 +142,13 @@ jobs:
         run: |
           sudo apt-add-repository -y "ppa:ubuntu-toolchain-r/test"
           sudo apt update
-          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev
+          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so

Review comment:
       so the agent doesn't even start up?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r585650088



##########
File path: extensions/sql/data/MaxCollector.h
##########
@@ -77,79 +74,66 @@ class MaxCollector: public SQLRowSubscriber {
   void processColumn(const std::string& name, const char* value) override {}
 
   template <typename T>
-  struct MaxValue {
-    void updateMaxValue(const std::string& name, const T& value) {
-      const auto it = mapColumnNameValue_.find(name);
-      if (it == mapColumnNameValue_.end()) {
-        mapColumnNameValue_.insert({ name, value });
+  class MaxValue {
+   public:
+    void updateMaxValue(const std::string& column, const T& value) {
+      const auto it = column_maxima.find(column);
+      if (it == column_maxima.end()) {
+        column_maxima.insert({ column, value });

Review comment:
       Minor, but by using emplace instead of insert here we could avoid the need for curly braces.

##########
File path: extensions/sql/processors/FlowFileSource.h
##########
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+#include <memory>
+
+#include "core/Property.h"
+#include "utils/Enum.h"
+#include "data/SQLRowsetProcessor.h"
+#include "ProcessSession.h"
+#include "data/JSONSQLWriter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class FlowFileSource {
+ public:
+  static const std::string FRAGMENT_IDENTIFIER;
+  static const std::string FRAGMENT_COUNT;
+  static const std::string FRAGMENT_INDEX;
+
+  static const core::Property OutputFormat;
+  static const core::Property MaxRowsPerFlowFile;
+
+  SMART_ENUM(OutputType,
+    (JSON, "JSON"),
+    (JSONPretty, "JSON-Pretty")
+  )
+
+ protected:
+  class FlowFileGenerator : public sql::SQLRowSubscriber {
+   public:
+    FlowFileGenerator(core::ProcessSession& session, sql::JSONSQLWriter& json_writer)
+      : session_(session),
+        json_writer_(json_writer) {}
+
+    void beginProcessBatch() override {
+      current_batch_size_ = 0;
+    }
+    void endProcessBatch(Progress progress) override;
+    void beginProcessRow() override {}
+    void endProcessRow() override {
+      ++current_batch_size_;
+    }
+    void processColumnNames(const std::vector<std::string>& names) override {}
+    void processColumn(const std::string& name, const std::string& value) override {}
+    void processColumn(const std::string& name, double value) override {}
+    void processColumn(const std::string& name, int value) override {}
+    void processColumn(const std::string& name, long long value) override {}
+    void processColumn(const std::string& name, unsigned long long value) override {}
+    void processColumn(const std::string& name, const char* value) override {}
+
+    std::shared_ptr<core::FlowFile> getLastFlowFile() {

Review comment:
       This could be a const member

##########
File path: extensions/sql/processors/QueryDatabaseTable.cpp
##########
@@ -75,361 +72,232 @@ const core::Property QueryDatabaseTable::s_maxValueColumnNames(
     "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
     "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
     "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
-    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
-    supportsExpressionLanguage(true)->build());
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_whereClause(
-  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
-    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+const core::Property QueryDatabaseTable::WhereClause(
+  core::PropertyBuilder::createProperty("Where Clause")
+  ->isRequired(false)
+  ->withDescription("A custom clause to be added in the WHERE condition when building SQL queries.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_sqlQuery(
-  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
-    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
-    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+const std::string QueryDatabaseTable::InitialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
 
-const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
-  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
-    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
-    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+const core::Relationship QueryDatabaseTable::Success("success", "Successfully created FlowFile from SQL query result set.");
 
-const core::Property QueryDatabaseTable::s_stateDirectory(
-  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.")->build());
+const std::string QueryDatabaseTable::RESULT_TABLE_NAME = "tablename";
+const std::string QueryDatabaseTable::RESULT_ROW_COUNT = "querydbtable.row.count";
 
-const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+const std::string QueryDatabaseTable::TABLENAME_KEY = "tablename";
+const std::string QueryDatabaseTable::MAXVALUE_KEY_PREFIX = "maxvalue.";
 
-const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
-
-static const std::string ResultTableName = "tablename";
-static const std::string ResultRowCount = "querydbtable.row.count";
-
-static const std::string TABLENAME_KEY = "tablename";
-static const std::string MAXVALUE_KEY_PREFIX = "maxvalue.";
-
-// State
-class LegacyState {
- public:
-  LegacyState(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
-    :tableName_(tableName), logger_(logger) {
-
-    filePath_ = utils::file::FileUtils::concat_path(
-            utils::file::FileUtils::concat_path(
-              utils::file::FileUtils::concat_path(stateDir, "uuid"), uuid), "State.txt");
+// QueryDatabaseTable
+QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
+  : SQLProcessor(name, uuid, logging::LoggerFactory<QueryDatabaseTable>::getLogger()) {
+}
 
-    if (!getStateFromFile())
-      return;
+void QueryDatabaseTable::initialize() {
+  //! Set the supported properties
+  setSupportedProperties({
+    DBControllerService, OutputFormat, TableName, ColumnNames,
+    MaxValueColumnNames, WhereClause, MaxRowsPerFlowFile});
 
-    ok_ = true;
-  }
+  //! Set the supported relationships
+  setSupportedRelationships({ Success });
+}
 
-  explicit operator bool() const {
-    return ok_;
-  }
+void QueryDatabaseTable::processOnSchedule(core::ProcessContext& context) {
+  context.getProperty(OutputFormat.getName(), output_format_);
+  max_rows_ = [&] {
+    uint64_t max_rows;
+    context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
+    return gsl::narrow<size_t>(max_rows);
+  }();
 
-  const std::unordered_map<std::string, std::string>& getStateMap() const {
-    return mapState_;
+  state_manager_ = context.getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
   }
 
-  bool moveStateFileToMigrated() {
-    if (!ok_) {
-      return false;
-    }
-    return rename(filePath_.c_str(), (filePath_ + "-migrated").c_str()) == 0;
+  context.getProperty(TableName.getName(), table_name_);
+  context.getProperty(WhereClause.getName(), extra_where_clause_);
+  max_value_columns_ = [&] {
+    std::string max_value_columns_str;
+    context.getProperty(MaxValueColumnNames.getName(), max_value_columns_str);
+    return utils::inputStringToList(max_value_columns_str);
+  }();
+  return_columns_ = [&] {
+    std::string return_columns_str;
+    context.getProperty(ColumnNames.getName(), return_columns_str);
+    return utils::inputStringToList(return_columns_str);
+  }();
+  queried_columns_ = utils::StringUtils::join(", ", return_columns_);
+  if (!queried_columns_.empty() && !max_value_columns_.empty()) {
+    // columns will be explicitly enumerated, we need to add the max value columns
+    queried_columns_ = queried_columns_ + ", " + utils::StringUtils::join(", ", max_value_columns_);
   }
 
- private:
-  static const std::string separator_;
-
-   bool getStateFromFile() {
-     std::string state;
-
-     std::ifstream file(filePath_);
-     if (!file) {
-       return false;
-     }
-
-     std::stringstream ss;
-     ss << file.rdbuf();
-
-     state = ss.str();
-
-     file.close();
-
-     std::vector<std::string> listColumnNameValue;
-
-     size_t pos = state.find(separator_, 0);
-     if (pos == std::string::npos) {
-       logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-       mapState_.clear();
-       return false;
-     }
-
-     auto tableName = state.substr(0, pos);
-     if (tableName != tableName_) {
-       logger_->log_warn("tableName is changed - now: '%s', in State.txt: '%s'.", tableName_.c_str(), tableName.c_str());
-       mapState_.clear();
-
-       return false;
-     }
-
-     pos += separator_.size();
-
-     while (true) {
-       auto newPos = state.find(separator_, pos);
-       if (newPos == std::string::npos)
-         break;
-
-       const std::string& columnNameValue = state.substr(pos, newPos - pos);
-       listColumnNameValue.emplace_back(columnNameValue);
-
-       pos = newPos + separator_.size();
-     }
-
-     for (const auto& columnNameValue : listColumnNameValue) {
-       const auto posEQ = columnNameValue.find('=');
-       if (posEQ == std::string::npos) {
-         logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-         mapState_.clear();
-         return false;
-       }
-
-       const auto& name = columnNameValue.substr(0, posEQ);
-       const auto& value = columnNameValue.substr(posEQ + 1);
-
-       mapState_.insert({ name, value });
-     }
-
-     return true;
-   }
-
- private:
-   std::unordered_map<std::string, std::string> mapState_;
-   std::shared_ptr<logging::Logger> logger_;
-   std::string filePath_;
-   std::string tableName_;
-   bool ok_{};
-};
-
-const std::string LegacyState::separator_ = "@!qdt!@";
-
-// QueryDatabaseTable
-QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid) {
+  initializeMaxValues(context);
 }
 
-QueryDatabaseTable::~QueryDatabaseTable() = default;
-
-void QueryDatabaseTable::initialize() {
-  //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), outputFormat(), s_tableName, s_columnNames, s_maxValueColumnNames, s_whereClause, s_sqlQuery, s_maxRowsPerFlowFile, s_stateDirectory});
+void QueryDatabaseTable::processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) {
+  const auto& selectQuery = buildSelectQuery();
 
-  //! Set the supported relationships
-  setSupportedRelationships({ s_success });
-}
+  logger_->log_info("QueryDatabaseTable: selectQuery: '%s'", selectQuery.c_str());
 
-void QueryDatabaseTable::processOnSchedule(core::ProcessContext &context) {
-  initOutputFormat(context);
+  auto statement = connection_->prepareStatement(selectQuery);
 
-  context.getProperty(s_tableName.getName(), tableName_);
-  context.getProperty(s_columnNames.getName(), columnNames_);
+  auto rowset = statement->execute();
 
-  context.getProperty(s_maxValueColumnNames.getName(), maxValueColumnNames_);
-  listMaxValueColumnName_ = utils::inputStringToList(maxValueColumnNames_);
+  std::unordered_map<std::string, std::string> new_max_values = max_values_;
+  sql::MaxCollector maxCollector{selectQuery, new_max_values};
+  auto column_filter = [&] (const std::string& column_name) {
+    return return_columns_.empty()
+      || std::find(return_columns_.begin(), return_columns_.end(), column_name) != return_columns_.end();
+  };
+  sql::JSONSQLWriter sqlWriter{output_format_ == OutputType::JSONPretty, column_filter};
+  FlowFileGenerator flow_file_creator{session, sqlWriter};
+  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, {sqlWriter, maxCollector, flow_file_creator});
+
+  while (size_t row_count = sqlRowsetProcessor.process(max_rows_)) {
+    auto new_file = flow_file_creator.getLastFlowFile();
+    new_file->addAttribute(RESULT_ROW_COUNT, std::to_string(row_count));
+    new_file->addAttribute(RESULT_TABLE_NAME, table_name_);
+  }
 
-  context.getProperty(s_whereClause.getName(), whereClause_);
-  context.getProperty(s_sqlQuery.getName(), sqlQuery_);
-  context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_);
+  // the updated max_values and the total number of flow_files is available from here
+  for (auto& new_file : flow_file_creator.getFlowFiles()) {
+    session.transfer(new_file, Success);
+    for (const auto& max_column : max_value_columns_) {
+      new_file->addAttribute("maxvalue." + max_column, new_max_values[max_column]);
+    }
+  }
 
-  mapState_.clear();
+  if (new_max_values != max_values_) {
+    try {
+      session.commit();
+    } catch (std::exception& e) {
+      throw;
+    }

Review comment:
       Why do we catch and rethrow here?

##########
File path: extensions/sql/processors/QueryDatabaseTable.cpp
##########
@@ -75,361 +72,232 @@ const core::Property QueryDatabaseTable::s_maxValueColumnNames(
     "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
     "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
     "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
-    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
-    supportsExpressionLanguage(true)->build());
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_whereClause(
-  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
-    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+const core::Property QueryDatabaseTable::WhereClause(
+  core::PropertyBuilder::createProperty("Where Clause")
+  ->isRequired(false)
+  ->withDescription("A custom clause to be added in the WHERE condition when building SQL queries.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_sqlQuery(
-  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
-    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
-    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+const std::string QueryDatabaseTable::InitialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
 
-const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
-  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
-    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
-    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+const core::Relationship QueryDatabaseTable::Success("success", "Successfully created FlowFile from SQL query result set.");
 
-const core::Property QueryDatabaseTable::s_stateDirectory(
-  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.")->build());
+const std::string QueryDatabaseTable::RESULT_TABLE_NAME = "tablename";
+const std::string QueryDatabaseTable::RESULT_ROW_COUNT = "querydbtable.row.count";
 
-const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+const std::string QueryDatabaseTable::TABLENAME_KEY = "tablename";
+const std::string QueryDatabaseTable::MAXVALUE_KEY_PREFIX = "maxvalue.";
 
-const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
-
-static const std::string ResultTableName = "tablename";
-static const std::string ResultRowCount = "querydbtable.row.count";
-
-static const std::string TABLENAME_KEY = "tablename";
-static const std::string MAXVALUE_KEY_PREFIX = "maxvalue.";
-
-// State
-class LegacyState {
- public:
-  LegacyState(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
-    :tableName_(tableName), logger_(logger) {
-
-    filePath_ = utils::file::FileUtils::concat_path(
-            utils::file::FileUtils::concat_path(
-              utils::file::FileUtils::concat_path(stateDir, "uuid"), uuid), "State.txt");
+// QueryDatabaseTable
+QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
+  : SQLProcessor(name, uuid, logging::LoggerFactory<QueryDatabaseTable>::getLogger()) {
+}
 
-    if (!getStateFromFile())
-      return;
+void QueryDatabaseTable::initialize() {
+  //! Set the supported properties
+  setSupportedProperties({
+    DBControllerService, OutputFormat, TableName, ColumnNames,
+    MaxValueColumnNames, WhereClause, MaxRowsPerFlowFile});
 
-    ok_ = true;
-  }
+  //! Set the supported relationships
+  setSupportedRelationships({ Success });
+}
 
-  explicit operator bool() const {
-    return ok_;
-  }
+void QueryDatabaseTable::processOnSchedule(core::ProcessContext& context) {
+  context.getProperty(OutputFormat.getName(), output_format_);
+  max_rows_ = [&] {
+    uint64_t max_rows;
+    context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
+    return gsl::narrow<size_t>(max_rows);
+  }();
 
-  const std::unordered_map<std::string, std::string>& getStateMap() const {
-    return mapState_;
+  state_manager_ = context.getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
   }
 
-  bool moveStateFileToMigrated() {
-    if (!ok_) {
-      return false;
-    }
-    return rename(filePath_.c_str(), (filePath_ + "-migrated").c_str()) == 0;
+  context.getProperty(TableName.getName(), table_name_);
+  context.getProperty(WhereClause.getName(), extra_where_clause_);
+  max_value_columns_ = [&] {
+    std::string max_value_columns_str;
+    context.getProperty(MaxValueColumnNames.getName(), max_value_columns_str);
+    return utils::inputStringToList(max_value_columns_str);
+  }();
+  return_columns_ = [&] {
+    std::string return_columns_str;
+    context.getProperty(ColumnNames.getName(), return_columns_str);
+    return utils::inputStringToList(return_columns_str);
+  }();
+  queried_columns_ = utils::StringUtils::join(", ", return_columns_);
+  if (!queried_columns_.empty() && !max_value_columns_.empty()) {
+    // columns will be explicitly enumerated, we need to add the max value columns
+    queried_columns_ = queried_columns_ + ", " + utils::StringUtils::join(", ", max_value_columns_);
   }
 
- private:
-  static const std::string separator_;
-
-   bool getStateFromFile() {
-     std::string state;
-
-     std::ifstream file(filePath_);
-     if (!file) {
-       return false;
-     }
-
-     std::stringstream ss;
-     ss << file.rdbuf();
-
-     state = ss.str();
-
-     file.close();
-
-     std::vector<std::string> listColumnNameValue;
-
-     size_t pos = state.find(separator_, 0);
-     if (pos == std::string::npos) {
-       logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-       mapState_.clear();
-       return false;
-     }
-
-     auto tableName = state.substr(0, pos);
-     if (tableName != tableName_) {
-       logger_->log_warn("tableName is changed - now: '%s', in State.txt: '%s'.", tableName_.c_str(), tableName.c_str());
-       mapState_.clear();
-
-       return false;
-     }
-
-     pos += separator_.size();
-
-     while (true) {
-       auto newPos = state.find(separator_, pos);
-       if (newPos == std::string::npos)
-         break;
-
-       const std::string& columnNameValue = state.substr(pos, newPos - pos);
-       listColumnNameValue.emplace_back(columnNameValue);
-
-       pos = newPos + separator_.size();
-     }
-
-     for (const auto& columnNameValue : listColumnNameValue) {
-       const auto posEQ = columnNameValue.find('=');
-       if (posEQ == std::string::npos) {
-         logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-         mapState_.clear();
-         return false;
-       }
-
-       const auto& name = columnNameValue.substr(0, posEQ);
-       const auto& value = columnNameValue.substr(posEQ + 1);
-
-       mapState_.insert({ name, value });
-     }
-
-     return true;
-   }
-
- private:
-   std::unordered_map<std::string, std::string> mapState_;
-   std::shared_ptr<logging::Logger> logger_;
-   std::string filePath_;
-   std::string tableName_;
-   bool ok_{};
-};
-
-const std::string LegacyState::separator_ = "@!qdt!@";
-
-// QueryDatabaseTable
-QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid) {
+  initializeMaxValues(context);
 }
 
-QueryDatabaseTable::~QueryDatabaseTable() = default;
-
-void QueryDatabaseTable::initialize() {
-  //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), outputFormat(), s_tableName, s_columnNames, s_maxValueColumnNames, s_whereClause, s_sqlQuery, s_maxRowsPerFlowFile, s_stateDirectory});
+void QueryDatabaseTable::processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) {
+  const auto& selectQuery = buildSelectQuery();
 
-  //! Set the supported relationships
-  setSupportedRelationships({ s_success });
-}
+  logger_->log_info("QueryDatabaseTable: selectQuery: '%s'", selectQuery.c_str());
 
-void QueryDatabaseTable::processOnSchedule(core::ProcessContext &context) {
-  initOutputFormat(context);
+  auto statement = connection_->prepareStatement(selectQuery);
 
-  context.getProperty(s_tableName.getName(), tableName_);
-  context.getProperty(s_columnNames.getName(), columnNames_);
+  auto rowset = statement->execute();
 
-  context.getProperty(s_maxValueColumnNames.getName(), maxValueColumnNames_);
-  listMaxValueColumnName_ = utils::inputStringToList(maxValueColumnNames_);
+  std::unordered_map<std::string, std::string> new_max_values = max_values_;
+  sql::MaxCollector maxCollector{selectQuery, new_max_values};
+  auto column_filter = [&] (const std::string& column_name) {
+    return return_columns_.empty()
+      || std::find(return_columns_.begin(), return_columns_.end(), column_name) != return_columns_.end();
+  };
+  sql::JSONSQLWriter sqlWriter{output_format_ == OutputType::JSONPretty, column_filter};
+  FlowFileGenerator flow_file_creator{session, sqlWriter};
+  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, {sqlWriter, maxCollector, flow_file_creator});
+
+  while (size_t row_count = sqlRowsetProcessor.process(max_rows_)) {
+    auto new_file = flow_file_creator.getLastFlowFile();
+    new_file->addAttribute(RESULT_ROW_COUNT, std::to_string(row_count));
+    new_file->addAttribute(RESULT_TABLE_NAME, table_name_);
+  }
 
-  context.getProperty(s_whereClause.getName(), whereClause_);
-  context.getProperty(s_sqlQuery.getName(), sqlQuery_);
-  context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_);
+  // the updated max_values and the total number of flow_files is available from here
+  for (auto& new_file : flow_file_creator.getFlowFiles()) {
+    session.transfer(new_file, Success);
+    for (const auto& max_column : max_value_columns_) {
+      new_file->addAttribute("maxvalue." + max_column, new_max_values[max_column]);
+    }
+  }
 
-  mapState_.clear();
+  if (new_max_values != max_values_) {
+    try {
+      session.commit();
+    } catch (std::exception& e) {
+      throw;
+    }
 
-  state_manager_ = context.getStateManager();
-  if (state_manager_ == nullptr) {
-    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+    max_values_ = new_max_values;
+    saveState();
   }
+}
 
-  std::unordered_map<std::string, std::string> state_map;
-  if (state_manager_->get(state_map)) {
-    if (state_map[TABLENAME_KEY] != tableName_) {
-      state_manager_->clear();
-    } else {
-      for (auto&& elem : state_map) {
-        if (elem.first.find(MAXVALUE_KEY_PREFIX) == 0) {
-          mapState_.emplace(elem.first.substr(MAXVALUE_KEY_PREFIX.length()), std::move(elem.second));
+void QueryDatabaseTable::initializeMaxValues(core::ProcessContext &context) {
+  max_values_.clear();
+  std::unordered_map<std::string, std::string> new_state;
+  if (!state_manager_->get(new_state)) {
+    logger_->log_info("Found no stored state");
+  } else {
+    const bool should_reset_state = [&] {

Review comment:
       It may be just personal preference, but I would rather have this as a member function, because of its complexity




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
arpadboda closed pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r601475374



##########
File path: extensions/sql/processors/ExecuteSQL.cpp
##########
@@ -51,72 +41,87 @@ namespace processors {
 
 const std::string ExecuteSQL::ProcessorName("ExecuteSQL");
 
-const core::Property ExecuteSQL::s_sqlSelectQuery(
-  core::PropertyBuilder::createProperty("SQL select query")->isRequired(true)->withDescription(
+const core::Property ExecuteSQL::SQLSelectQuery(
+  core::PropertyBuilder::createProperty("SQL select query")
+  ->withDescription(
     "The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. "
     "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
     "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database. "
-    "Note that Expression Language is not evaluated for flow file contents.")->supportsExpressionLanguage(true)->build());
+    "Note that Expression Language is not evaluated for flow file contents.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property ExecuteSQL::s_maxRowsPerFlowFile(
-  core::PropertyBuilder::createProperty("Max Rows Per Flow File")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
-    "The maximum number of result rows that will be included intoi a flow file. If zero then all will be placed into the flow file")->supportsExpressionLanguage(true)->build());
+const core::Relationship ExecuteSQL::Success("success", "Successfully created FlowFile from SQL query result set.");
 
-const core::Relationship ExecuteSQL::s_success("success", "Successfully created FlowFile from SQL query result set.");
-
-static const std::string ResultRowCount = "executesql.row.count";
+const std::string ExecuteSQL::RESULT_ROW_COUNT = "executesql.row.count";
+const std::string ExecuteSQL::INPUT_FLOW_FILE_UUID = "input.flowfile.uuid";
 
 ExecuteSQL::ExecuteSQL(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid), max_rows_(0) {
+  : SQLProcessor(name, uuid, logging::LoggerFactory<ExecuteSQL>::getLogger()) {
 }
 
-ExecuteSQL::~ExecuteSQL() = default;
-
 void ExecuteSQL::initialize() {
   //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), outputFormat(), s_sqlSelectQuery, s_maxRowsPerFlowFile});
+  setSupportedProperties({ DBControllerService, OutputFormat, SQLSelectQuery, MaxRowsPerFlowFile});
 
   //! Set the supported relationships
-  setSupportedRelationships({ s_success });
+  setSupportedRelationships({ Success });
 }
 
-void ExecuteSQL::processOnSchedule(core::ProcessContext &context) {
-  initOutputFormat(context);
+void ExecuteSQL::processOnSchedule(core::ProcessContext& context) {
+  context.getProperty(OutputFormat.getName(), output_format_);
 
-  context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
-  context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
+  max_rows_ = [&] {
+    uint64_t max_rows;
+    context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
+    return gsl::narrow<size_t>(max_rows);
+  }();
 }
 
-void ExecuteSQL::processOnTrigger(core::ProcessSession& session) {
-  auto statement = connection_->prepareStatement(sqlSelectQuery_);
+void ExecuteSQL::processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) {
+  auto input_flow_file = session.get();
 
-  auto rowset = statement->execute();
-
-  int count = 0;
-  size_t rowCount = 0;
-  sql::JSONSQLWriter sqlWriter(isJSONPretty());
-  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, { &sqlWriter });
+  std::string query;
+  if (!context.getProperty(SQLSelectQuery, query, input_flow_file)) {
+    if (!input_flow_file) {
+      throw Exception(PROCESSOR_EXCEPTION,
+                      "No incoming FlowFile and the \"SQL select query\" processor property is not specified");

Review comment:
       done

##########
File path: extensions/sql/processors/ExecuteSQL.cpp
##########
@@ -51,72 +41,87 @@ namespace processors {
 
 const std::string ExecuteSQL::ProcessorName("ExecuteSQL");
 
-const core::Property ExecuteSQL::s_sqlSelectQuery(
-  core::PropertyBuilder::createProperty("SQL select query")->isRequired(true)->withDescription(
+const core::Property ExecuteSQL::SQLSelectQuery(
+  core::PropertyBuilder::createProperty("SQL select query")
+  ->withDescription(
     "The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. "
     "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
     "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database. "
-    "Note that Expression Language is not evaluated for flow file contents.")->supportsExpressionLanguage(true)->build());
+    "Note that Expression Language is not evaluated for flow file contents.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property ExecuteSQL::s_maxRowsPerFlowFile(
-  core::PropertyBuilder::createProperty("Max Rows Per Flow File")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
-    "The maximum number of result rows that will be included intoi a flow file. If zero then all will be placed into the flow file")->supportsExpressionLanguage(true)->build());
+const core::Relationship ExecuteSQL::Success("success", "Successfully created FlowFile from SQL query result set.");
 
-const core::Relationship ExecuteSQL::s_success("success", "Successfully created FlowFile from SQL query result set.");
-
-static const std::string ResultRowCount = "executesql.row.count";
+const std::string ExecuteSQL::RESULT_ROW_COUNT = "executesql.row.count";
+const std::string ExecuteSQL::INPUT_FLOW_FILE_UUID = "input.flowfile.uuid";
 
 ExecuteSQL::ExecuteSQL(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid), max_rows_(0) {
+  : SQLProcessor(name, uuid, logging::LoggerFactory<ExecuteSQL>::getLogger()) {
 }
 
-ExecuteSQL::~ExecuteSQL() = default;
-
 void ExecuteSQL::initialize() {
   //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), outputFormat(), s_sqlSelectQuery, s_maxRowsPerFlowFile});
+  setSupportedProperties({ DBControllerService, OutputFormat, SQLSelectQuery, MaxRowsPerFlowFile});
 
   //! Set the supported relationships
-  setSupportedRelationships({ s_success });
+  setSupportedRelationships({ Success });
 }
 
-void ExecuteSQL::processOnSchedule(core::ProcessContext &context) {
-  initOutputFormat(context);
+void ExecuteSQL::processOnSchedule(core::ProcessContext& context) {
+  context.getProperty(OutputFormat.getName(), output_format_);
 
-  context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
-  context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
+  max_rows_ = [&] {
+    uint64_t max_rows;
+    context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
+    return gsl::narrow<size_t>(max_rows);
+  }();
 }
 
-void ExecuteSQL::processOnTrigger(core::ProcessSession& session) {
-  auto statement = connection_->prepareStatement(sqlSelectQuery_);
+void ExecuteSQL::processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) {
+  auto input_flow_file = session.get();
 
-  auto rowset = statement->execute();
-
-  int count = 0;
-  size_t rowCount = 0;
-  sql::JSONSQLWriter sqlWriter(isJSONPretty());
-  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, { &sqlWriter });
+  std::string query;
+  if (!context.getProperty(SQLSelectQuery, query, input_flow_file)) {
+    if (!input_flow_file) {
+      throw Exception(PROCESSOR_EXCEPTION,
+                      "No incoming FlowFile and the \"SQL select query\" processor property is not specified");
+    }
+    logger_->log_debug("Using the contents of the flow file as the SQL statement");
+    auto buffer = std::make_shared<io::BufferStream>();
+    InputStreamPipe content_reader{buffer};
+    session.read(input_flow_file, &content_reader);
+    query = std::string{reinterpret_cast<const char *>(buffer->getBuffer()), buffer->size()};
+  }
+  if (query.empty()) {
+    throw Exception(PROCESSOR_EXCEPTION, "Empty SQL statement");
+  }
+
+  auto row_set = connection_->prepareStatement(query)->execute(collectArguments(input_flow_file));
+
+  sql::JSONSQLWriter sqlWriter{output_format_ == OutputType::JSONPretty};

Review comment:
       done

##########
File path: extensions/sql/processors/ExecuteSQL.cpp
##########
@@ -51,72 +41,87 @@ namespace processors {
 
 const std::string ExecuteSQL::ProcessorName("ExecuteSQL");
 
-const core::Property ExecuteSQL::s_sqlSelectQuery(
-  core::PropertyBuilder::createProperty("SQL select query")->isRequired(true)->withDescription(
+const core::Property ExecuteSQL::SQLSelectQuery(
+  core::PropertyBuilder::createProperty("SQL select query")
+  ->withDescription(
     "The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. "
     "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
     "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database. "
-    "Note that Expression Language is not evaluated for flow file contents.")->supportsExpressionLanguage(true)->build());
+    "Note that Expression Language is not evaluated for flow file contents.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property ExecuteSQL::s_maxRowsPerFlowFile(
-  core::PropertyBuilder::createProperty("Max Rows Per Flow File")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
-    "The maximum number of result rows that will be included intoi a flow file. If zero then all will be placed into the flow file")->supportsExpressionLanguage(true)->build());
+const core::Relationship ExecuteSQL::Success("success", "Successfully created FlowFile from SQL query result set.");
 
-const core::Relationship ExecuteSQL::s_success("success", "Successfully created FlowFile from SQL query result set.");
-
-static const std::string ResultRowCount = "executesql.row.count";
+const std::string ExecuteSQL::RESULT_ROW_COUNT = "executesql.row.count";
+const std::string ExecuteSQL::INPUT_FLOW_FILE_UUID = "input.flowfile.uuid";
 
 ExecuteSQL::ExecuteSQL(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid), max_rows_(0) {
+  : SQLProcessor(name, uuid, logging::LoggerFactory<ExecuteSQL>::getLogger()) {
 }
 
-ExecuteSQL::~ExecuteSQL() = default;
-
 void ExecuteSQL::initialize() {
   //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), outputFormat(), s_sqlSelectQuery, s_maxRowsPerFlowFile});
+  setSupportedProperties({ DBControllerService, OutputFormat, SQLSelectQuery, MaxRowsPerFlowFile});
 
   //! Set the supported relationships
-  setSupportedRelationships({ s_success });
+  setSupportedRelationships({ Success });
 }
 
-void ExecuteSQL::processOnSchedule(core::ProcessContext &context) {
-  initOutputFormat(context);
+void ExecuteSQL::processOnSchedule(core::ProcessContext& context) {
+  context.getProperty(OutputFormat.getName(), output_format_);
 
-  context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
-  context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
+  max_rows_ = [&] {
+    uint64_t max_rows;
+    context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
+    return gsl::narrow<size_t>(max_rows);
+  }();
 }
 
-void ExecuteSQL::processOnTrigger(core::ProcessSession& session) {
-  auto statement = connection_->prepareStatement(sqlSelectQuery_);
+void ExecuteSQL::processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) {
+  auto input_flow_file = session.get();
 
-  auto rowset = statement->execute();
-
-  int count = 0;
-  size_t rowCount = 0;
-  sql::JSONSQLWriter sqlWriter(isJSONPretty());
-  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, { &sqlWriter });
+  std::string query;
+  if (!context.getProperty(SQLSelectQuery, query, input_flow_file)) {
+    if (!input_flow_file) {
+      throw Exception(PROCESSOR_EXCEPTION,
+                      "No incoming FlowFile and the \"SQL select query\" processor property is not specified");
+    }
+    logger_->log_debug("Using the contents of the flow file as the SQL statement");
+    auto buffer = std::make_shared<io::BufferStream>();
+    InputStreamPipe content_reader{buffer};
+    session.read(input_flow_file, &content_reader);
+    query = std::string{reinterpret_cast<const char *>(buffer->getBuffer()), buffer->size()};
+  }
+  if (query.empty()) {
+    throw Exception(PROCESSOR_EXCEPTION, "Empty SQL statement");
+  }
+
+  auto row_set = connection_->prepareStatement(query)->execute(collectArguments(input_flow_file));
+
+  sql::JSONSQLWriter sqlWriter{output_format_ == OutputType::JSONPretty};
+  FlowFileGenerator flow_file_creator{session, sqlWriter};
+  sql::SQLRowsetProcessor sqlRowsetProcessor(row_set, {sqlWriter, flow_file_creator});
 
   // Process rowset.
-  do {
-    rowCount = sqlRowsetProcessor.process(max_rows_ == 0 ? std::numeric_limits<size_t>::max() : max_rows_);
-    count++;
-    if (rowCount == 0)
-      break;
-
-    const auto output = sqlWriter.toString();
-    if (!output.empty()) {
-      WriteCallback writer(output);
-      auto newflow = session.create();
-      newflow->addAttribute(ResultRowCount, std::to_string(rowCount));
-      session.write(newflow, &writer);
-      session.transfer(newflow, s_success);
+  while (size_t row_count = sqlRowsetProcessor.process(max_rows_)) {
+    auto new_file = flow_file_creator.getLastFlowFile();
+    new_file->addAttribute(RESULT_ROW_COUNT, std::to_string(row_count));

Review comment:
       done

##########
File path: extensions/sql/processors/FlowFileSource.cpp
##########
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FlowFileSource.h"
+
+#include "FlowFile.h"
+#include "data/JSONSQLWriter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const core::Property FlowFileSource::OutputFormat(
+  core::PropertyBuilder::createProperty("Output Format")
+  ->isRequired(true)
+  ->supportsExpressionLanguage(true)
+  ->withDefaultValue(toString(OutputType::JSONPretty))
+  ->withAllowableValues<std::string>(OutputType::values())
+  ->withDescription("Set the output format type.")->build());
+
+const core::Property FlowFileSource::MaxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("Max Rows Per Flow File")
+  ->isRequired(true)
+  ->supportsExpressionLanguage(true)
+  ->withDefaultValue<uint64_t>(0)
+  ->withDescription(
+      "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+      "If the value specified is zero, then all rows are returned in a single FlowFile.")->build());
+
+const std::string FlowFileSource::FRAGMENT_IDENTIFIER = "fragment.identifier";
+const std::string FlowFileSource::FRAGMENT_COUNT = "fragment.count";
+const std::string FlowFileSource::FRAGMENT_INDEX = "fragment.index";
+
+void FlowFileSource::FlowFileGenerator::endProcessBatch(Progress progress) {
+  if (progress == Progress::DONE) {
+    // annotate the flow files with the fragment.count
+    std::string fragment_count = std::to_string(flow_files_.size());
+    for (const auto& flow_file : flow_files_) {
+      flow_file->addAttribute(FRAGMENT_COUNT, fragment_count);
+    }
+    return;
+  }

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603246521



##########
File path: .github/workflows/ci.yml
##########
@@ -132,12 +142,13 @@ jobs:
         run: |
           sudo apt-add-repository -y "ppa:ubuntu-toolchain-r/test"
           sudo apt update
-          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev
+          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so

Review comment:
       I feel like this is a problem with the `libsqliteodbc` library, but you are right that it introduces quite a friction and cryptic errors, we could maybe print some warnings?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603223913



##########
File path: libminifi/test/sql-tests/SQLTestPlan.h
##########
@@ -0,0 +1,98 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "../TestBase.h"
+
+class SQLTestPlan {
+ public:
+  SQLTestPlan(TestController& controller, const std::string& connection_str, const std::string& sql_processor, std::initializer_list<core::Relationship> output_rels) {
+    plan_ = controller.createPlan();
+    processor_ = plan_->addProcessor(sql_processor, sql_processor);
+    plan_->setProperty(processor_, "DB Controller Service", "ODBCService");
+    input_ = plan_->addConnection({}, {"success", "d"}, processor_);
+    for (const auto& output_rel : output_rels) {
+      outputs_[output_rel] = plan_->addConnection(processor_, output_rel, {});
+    }
+
+    // initialize database service
+    auto service = plan_->addController("ODBCService", "ODBCService");
+    plan_->setProperty(service, minifi::sql::controllers::DatabaseService::ConnectionString.getName(), connection_str);
+  }
+
+  std::string getContent(const std::shared_ptr<core::FlowFile>& flow_file) {
+    return plan_->getContent(flow_file);
+  }
+
+  std::shared_ptr<core::FlowFile> addInput(std::initializer_list<std::pair<std::string, std::string>> attributes = {}, const utils::optional<std::string>& content = {}) {
+    auto flow_file = std::make_shared<minifi::FlowFileRecord>();
+    for (const auto& attr : attributes) {
+      flow_file->setAttribute(attr.first, attr.second);
+    }
+    if (content) {
+      auto claim = std::make_shared<minifi::ResourceClaim>(plan_->getContentRepo());
+      auto content_stream = plan_->getContentRepo()->write(*claim);
+      int ret = content_stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(content->c_str())), content->length());
+      REQUIRE(ret == content->length());
+      flow_file->setOffset(0);
+      flow_file->setSize(content->length());
+      flow_file->setResourceClaim(claim);
+    }
+    input_->put(flow_file);
+    return flow_file;
+  }
+
+  std::shared_ptr<core::Processor> getSQLProcessor() {
+    return processor_;
+  }
+
+  void run(bool reschedule = false) {
+    if (reschedule) {
+      plan_->reset(reschedule);
+    }
+    plan_->runProcessor(0);  // run the one and only sql processor
+  }
+
+  std::map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> getAllOutputs() {
+    std::map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> flow_file_map;
+    for (const auto& output : outputs_) {
+      flow_file_map[output.first] = getOutputs(output.first);
+    }
+    return flow_file_map;
+  }
+
+  std::vector<std::shared_ptr<core::FlowFile>> getOutputs(const core::Relationship& relationship) {
+    auto conn = outputs_[relationship];
+    REQUIRE(conn);
+    std::vector<std::shared_ptr<core::FlowFile>> flow_files;
+    std::set<std::shared_ptr<core::FlowFile>> expired;
+    while (auto flow_file = conn->poll(expired)) {
+      REQUIRE(expired.empty());
+      flow_files.push_back(std::move(flow_file));
+    }
+    REQUIRE(expired.empty());

Review comment:
       might be the case that poll returns nullptr, then the expired wouldn't be checked, although I understand that it is an unlikely event, should I remove it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603298693



##########
File path: libminifi/test/sql-tests/ExecuteSQLTests.cpp
##########
@@ -0,0 +1,195 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "SQLTestController.h"
+#include "processors/ExecuteSQL.h"
+#include "Utils.h"
+#include "FlowFileMatcher.h"
+
+TEST_CASE("ExecuteSQL works without incoming flow file", "[ExecuteSQL1]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty("SQL select query", "SELECT * FROM test_table ORDER BY int_col ASC");
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "2");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    },{
+      "int_col": 22,
+      "text_col": "two"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses statement in property", "[ExecuteSQL2]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty("SQL select query", "SELECT * FROM test_table WHERE int_col == ${int_col_value}");
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  auto input_file = plan->addInput({{"int_col_value", "11"}});
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "1");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses statement in content", "[ExecuteSQL3]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  auto input_file = plan->addInput({}, "SELECT * FROM test_table ORDER BY int_col ASC");
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "2");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    },{
+      "int_col": 22,
+      "text_col": "two"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses sql.args.N.value attributes", "[ExecuteSQL4]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+
+  controller.insertValues({{11, "apple"}, {11, "banana"}, {22, "banana"}});
+
+  auto input_file = plan->addInput({
+    {"sql.args.1.value", "11"},
+    {"sql.args.2.value", "banana"}
+  }, "SELECT * FROM test_table WHERE int_col == ? AND text_col == ?");
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "1");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "banana"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL honors Max Rows Per Flow File", "[ExecuteSQL5]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty(processors::ExecuteSQL::MaxRowsPerFlowFile.getName(), "2");
+  sql_proc->setProperty(processors::ExecuteSQL::SQLSelectQuery.getName(), "SELECT text_col FROM test_table ORDER BY int_col ASC");
+
+  controller.insertValues({
+    {101, "apple"},
+    {102, "banana"},
+    {103, "pear"},
+    {104, "strawberry"},
+    {105, "pineapple"}
+  });
+
+  auto input_file = plan->addInput();
+
+  plan->run();
+
+  auto content_verifier = [&] (const std::shared_ptr<core::FlowFile>& actual, const std::string& expected) {
+    verifyJSON(plan->getContent(actual), expected);
+  };
+
+  FlowFileMatcher matcher{content_verifier, {
+      processors::ExecuteSQL::RESULT_ROW_COUNT,
+      processors::ExecuteSQL::FRAGMENT_COUNT,
+      processors::ExecuteSQL::FRAGMENT_INDEX,
+      processors::ExecuteSQL::FRAGMENT_IDENTIFIER
+  }};
+
+  utils::optional<std::string> fragment_id;
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 3);
+  matcher.verify(flow_files[0],
+    {"2", "3", "0", var("frag_id")},
+    R"([{"text_col": "apple"}, {"text_col": "banana"}])");
+  matcher.verify(flow_files[1],
+    {"2", "3", "1", var("frag_id")},
+    R"([{"text_col": "pear"}, {"text_col": "strawberry"}])");
+  matcher.verify(flow_files[2],
+    {"1", "3", "2", var("frag_id")},

Review comment:
       added the REQUIRE check




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603881995



##########
File path: .github/workflows/ci.yml
##########
@@ -132,12 +142,13 @@ jobs:
         run: |
           sudo apt-add-repository -y "ppa:ubuntu-toolchain-r/test"
           sudo apt update
-          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev
+          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so

Review comment:
       The binary does not work on my Ubuntu 18.04 without the symlink.  With the symlink, it works fine.
   
   My `/etc/odbcinst.ini` file was created by `sudo apt install libsqliteodbc`, and it looks like this:
   ```
   [SQLite]
   Description=SQLite ODBC Driver
   Driver=libsqliteodbc.so
   Setup=libsqliteodbc.so
   UsageCount=1
   
   [SQLite3]
   Description=SQLite3 ODBC Driver
   Driver=libsqlite3odbc.so
   Setup=libsqlite3odbc.so
   UsageCount=1
   ```
   and my connection string is `Driver=SQLite3;Database=/tmp/my_database.db`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603102486



##########
File path: libminifi/test/sql-tests/ExecuteSQLTests.cpp
##########
@@ -0,0 +1,195 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "SQLTestController.h"
+#include "processors/ExecuteSQL.h"
+#include "Utils.h"
+#include "FlowFileMatcher.h"
+
+TEST_CASE("ExecuteSQL works without incoming flow file", "[ExecuteSQL1]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty("SQL select query", "SELECT * FROM test_table ORDER BY int_col ASC");
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "2");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    },{
+      "int_col": 22,
+      "text_col": "two"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses statement in property", "[ExecuteSQL2]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty("SQL select query", "SELECT * FROM test_table WHERE int_col == ${int_col_value}");

Review comment:
       I'm surprised `==` works here.  Single `=` is more standard in SQL; I think the test should either have `=`, or two versions, one with `=` and one with `==`.  (Also in `ExecuteSQL4`.)

##########
File path: extensions/sql/processors/QueryDatabaseTable.cpp
##########
@@ -75,361 +72,234 @@ const core::Property QueryDatabaseTable::s_maxValueColumnNames(
     "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
     "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
     "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
-    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
-    supportsExpressionLanguage(true)->build());
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_whereClause(
-  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
-    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+const core::Property QueryDatabaseTable::WhereClause(
+  core::PropertyBuilder::createProperty("Where Clause")
+  ->isRequired(false)
+  ->withDescription("A custom clause to be added in the WHERE condition when building SQL queries.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_sqlQuery(
-  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
-    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
-    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+const std::string QueryDatabaseTable::InitialMaxValueDynamicPropertyPrefix("initial.maxvalue.");

Review comment:
       Some documentation about this dynamic property should be included in PROCESSORS.md.
   
   (Especially since, according to the unit test, it does not work as I first thought it does, based on its name: only rows with column value greater than the attribute value are returned, rather than greater or equal.)

##########
File path: libminifi/test/sql-tests/ExecuteSQLTests.cpp
##########
@@ -0,0 +1,195 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "SQLTestController.h"
+#include "processors/ExecuteSQL.h"
+#include "Utils.h"
+#include "FlowFileMatcher.h"
+
+TEST_CASE("ExecuteSQL works without incoming flow file", "[ExecuteSQL1]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty("SQL select query", "SELECT * FROM test_table ORDER BY int_col ASC");
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "2");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    },{
+      "int_col": 22,
+      "text_col": "two"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses statement in property", "[ExecuteSQL2]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty("SQL select query", "SELECT * FROM test_table WHERE int_col == ${int_col_value}");
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  auto input_file = plan->addInput({{"int_col_value", "11"}});
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "1");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses statement in content", "[ExecuteSQL3]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  auto input_file = plan->addInput({}, "SELECT * FROM test_table ORDER BY int_col ASC");
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "2");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    },{
+      "int_col": 22,
+      "text_col": "two"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses sql.args.N.value attributes", "[ExecuteSQL4]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+
+  controller.insertValues({{11, "apple"}, {11, "banana"}, {22, "banana"}});
+
+  auto input_file = plan->addInput({
+    {"sql.args.1.value", "11"},
+    {"sql.args.2.value", "banana"}
+  }, "SELECT * FROM test_table WHERE int_col == ? AND text_col == ?");
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "1");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "banana"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL honors Max Rows Per Flow File", "[ExecuteSQL5]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty(processors::ExecuteSQL::MaxRowsPerFlowFile.getName(), "2");
+  sql_proc->setProperty(processors::ExecuteSQL::SQLSelectQuery.getName(), "SELECT text_col FROM test_table ORDER BY int_col ASC");
+
+  controller.insertValues({
+    {101, "apple"},
+    {102, "banana"},
+    {103, "pear"},
+    {104, "strawberry"},
+    {105, "pineapple"}
+  });
+
+  auto input_file = plan->addInput();
+
+  plan->run();
+
+  auto content_verifier = [&] (const std::shared_ptr<core::FlowFile>& actual, const std::string& expected) {
+    verifyJSON(plan->getContent(actual), expected);
+  };
+
+  FlowFileMatcher matcher{content_verifier, {
+      processors::ExecuteSQL::RESULT_ROW_COUNT,
+      processors::ExecuteSQL::FRAGMENT_COUNT,
+      processors::ExecuteSQL::FRAGMENT_INDEX,
+      processors::ExecuteSQL::FRAGMENT_IDENTIFIER
+  }};
+
+  utils::optional<std::string> fragment_id;
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 3);
+  matcher.verify(flow_files[0],
+    {"2", "3", "0", var("frag_id")},
+    R"([{"text_col": "apple"}, {"text_col": "banana"}])");
+  matcher.verify(flow_files[1],
+    {"2", "3", "1", var("frag_id")},
+    R"([{"text_col": "pear"}, {"text_col": "strawberry"}])");
+  matcher.verify(flow_files[2],
+    {"1", "3", "2", var("frag_id")},

Review comment:
       After staring at it for a while, I _think_ this checks that `FRAGMENT_IDENTIFIER` is the same in all three flow files, but this code is not easy to read.
   
   I think something like this would be clearer:
   ```c++
   utils::optional<std::string> fragment_id;
   
   matcher.verify(..., capture(fragment_id), ...);
   REQUIRE(fragment_id);
   matcher.verify(..., *fragment_id, ...);
   matcher.verify(..., *fragment_id, ...);
   ```
   i.e. `AttributeValue` would have a "capture" mode instead of an "is_variable" mode.

##########
File path: libminifi/test/sql-tests/SQLTestPlan.h
##########
@@ -0,0 +1,98 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "../TestBase.h"
+
+class SQLTestPlan {
+ public:
+  SQLTestPlan(TestController& controller, const std::string& connection_str, const std::string& sql_processor, std::initializer_list<core::Relationship> output_rels) {
+    plan_ = controller.createPlan();
+    processor_ = plan_->addProcessor(sql_processor, sql_processor);
+    plan_->setProperty(processor_, "DB Controller Service", "ODBCService");
+    input_ = plan_->addConnection({}, {"success", "d"}, processor_);
+    for (const auto& output_rel : output_rels) {
+      outputs_[output_rel] = plan_->addConnection(processor_, output_rel, {});
+    }
+
+    // initialize database service
+    auto service = plan_->addController("ODBCService", "ODBCService");
+    plan_->setProperty(service, minifi::sql::controllers::DatabaseService::ConnectionString.getName(), connection_str);
+  }
+
+  std::string getContent(const std::shared_ptr<core::FlowFile>& flow_file) {
+    return plan_->getContent(flow_file);
+  }
+
+  std::shared_ptr<core::FlowFile> addInput(std::initializer_list<std::pair<std::string, std::string>> attributes = {}, const utils::optional<std::string>& content = {}) {
+    auto flow_file = std::make_shared<minifi::FlowFileRecord>();
+    for (const auto& attr : attributes) {
+      flow_file->setAttribute(attr.first, attr.second);
+    }
+    if (content) {
+      auto claim = std::make_shared<minifi::ResourceClaim>(plan_->getContentRepo());
+      auto content_stream = plan_->getContentRepo()->write(*claim);
+      int ret = content_stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(content->c_str())), content->length());
+      REQUIRE(ret == content->length());
+      flow_file->setOffset(0);
+      flow_file->setSize(content->length());
+      flow_file->setResourceClaim(claim);
+    }
+    input_->put(flow_file);
+    return flow_file;
+  }
+
+  std::shared_ptr<core::Processor> getSQLProcessor() {
+    return processor_;
+  }
+
+  void run(bool reschedule = false) {
+    if (reschedule) {
+      plan_->reset(reschedule);
+    }
+    plan_->runProcessor(0);  // run the one and only sql processor
+  }
+
+  std::map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> getAllOutputs() {

Review comment:
       I don't mind keeping this if you think it will be useful in the future, but `getAllOutputs()` is not used anywhere.

##########
File path: libminifi/test/sql-tests/SQLTestPlan.h
##########
@@ -0,0 +1,98 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "../TestBase.h"
+
+class SQLTestPlan {
+ public:
+  SQLTestPlan(TestController& controller, const std::string& connection_str, const std::string& sql_processor, std::initializer_list<core::Relationship> output_rels) {
+    plan_ = controller.createPlan();
+    processor_ = plan_->addProcessor(sql_processor, sql_processor);
+    plan_->setProperty(processor_, "DB Controller Service", "ODBCService");
+    input_ = plan_->addConnection({}, {"success", "d"}, processor_);
+    for (const auto& output_rel : output_rels) {
+      outputs_[output_rel] = plan_->addConnection(processor_, output_rel, {});
+    }
+
+    // initialize database service
+    auto service = plan_->addController("ODBCService", "ODBCService");
+    plan_->setProperty(service, minifi::sql::controllers::DatabaseService::ConnectionString.getName(), connection_str);
+  }
+
+  std::string getContent(const std::shared_ptr<core::FlowFile>& flow_file) {
+    return plan_->getContent(flow_file);
+  }
+
+  std::shared_ptr<core::FlowFile> addInput(std::initializer_list<std::pair<std::string, std::string>> attributes = {}, const utils::optional<std::string>& content = {}) {
+    auto flow_file = std::make_shared<minifi::FlowFileRecord>();
+    for (const auto& attr : attributes) {
+      flow_file->setAttribute(attr.first, attr.second);
+    }
+    if (content) {
+      auto claim = std::make_shared<minifi::ResourceClaim>(plan_->getContentRepo());
+      auto content_stream = plan_->getContentRepo()->write(*claim);
+      int ret = content_stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(content->c_str())), content->length());
+      REQUIRE(ret == content->length());
+      flow_file->setOffset(0);
+      flow_file->setSize(content->length());
+      flow_file->setResourceClaim(claim);
+    }
+    input_->put(flow_file);
+    return flow_file;
+  }
+
+  std::shared_ptr<core::Processor> getSQLProcessor() {
+    return processor_;
+  }
+
+  void run(bool reschedule = false) {
+    if (reschedule) {
+      plan_->reset(reschedule);
+    }
+    plan_->runProcessor(0);  // run the one and only sql processor
+  }
+
+  std::map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> getAllOutputs() {
+    std::map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> flow_file_map;
+    for (const auto& output : outputs_) {
+      flow_file_map[output.first] = getOutputs(output.first);
+    }
+    return flow_file_map;
+  }
+
+  std::vector<std::shared_ptr<core::FlowFile>> getOutputs(const core::Relationship& relationship) {
+    auto conn = outputs_[relationship];
+    REQUIRE(conn);
+    std::vector<std::shared_ptr<core::FlowFile>> flow_files;
+    std::set<std::shared_ptr<core::FlowFile>> expired;
+    while (auto flow_file = conn->poll(expired)) {
+      REQUIRE(expired.empty());
+      flow_files.push_back(std::move(flow_file));
+    }
+    REQUIRE(expired.empty());

Review comment:
       we have already checked this in line 86, so this line can be removed

##########
File path: .github/workflows/ci.yml
##########
@@ -132,12 +142,13 @@ jobs:
         run: |
           sudo apt-add-repository -y "ppa:ubuntu-toolchain-r/test"
           sudo apt update
-          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev
+          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so

Review comment:
       should we do this symlinking in (the OS-specific script called from) bootstrap.sh?

##########
File path: libminifi/test/sql-tests/QueryDatabaseTableTests.cpp
##########
@@ -0,0 +1,248 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "../TestBase.h"
+#include "SQLTestController.h"
+#include "Utils.h"
+#include "FlowFileMatcher.h"
+
+TEST_CASE("QueryDatabaseTable queries the table and returns specified columns", "[QueryDatabaseTable1]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty(processors::QueryDatabaseTable::TableName.getName(), "test_table");
+  sql_proc->setProperty(processors::QueryDatabaseTable::MaxValueColumnNames.getName(), "int_col");
+  sql_proc->setProperty(processors::QueryDatabaseTable::ColumnNames.getName(), "text_col");
+
+  controller.insertValues({
+    {101, "one"},
+    {102, "two"},
+    {103, "three"}
+  });
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "3");
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{"text_col": "one"}, {"text_col": "two"}, {"text_col": "three"}]
+  )", true);
+}
+
+TEST_CASE("QueryDatabaseTable requerying the table returns only new rows", "[QueryDatabaseTable2]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty(processors::QueryDatabaseTable::TableName.getName(), "test_table");
+  sql_proc->setProperty(processors::QueryDatabaseTable::MaxValueColumnNames.getName(), "int_col");
+  sql_proc->setProperty(processors::QueryDatabaseTable::ColumnNames.getName(), "text_col");
+
+  controller.insertValues({
+    {101, "one"},
+    {102, "two"},
+    {103, "three"}
+  });
+
+  plan->run();
+
+  auto first_flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(first_flow_files.size() == 1);
+
+  controller.insertValues({
+    {104, "four"},
+    {105, "five"}
+  });
+
+  SECTION("Without schedule") {plan->run();}
+  SECTION("With schedule") {plan->run(true);}

Review comment:
       These section descriptions are not clear; I would change them to something like "Run onTrigger only" and "Run both onSchedule and onTrigger".

##########
File path: libminifi/test/sql-tests/PutSQLTests.cpp
##########
@@ -0,0 +1,78 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "../TestBase.h"
+#include "SQLTestController.h"
+
+#include "processors/PutSQL.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/UpdateAttribute.h"
+#include "processors/LogAttribute.h"
+#include "processors/GetFile.h"
+
+TEST_CASE("Test Creation of PutSQL", "[PutSQLCreate]") {  // NOLINT

Review comment:
       are the NOLINT annotations on the TEST_CASEs in this file necessary?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r601480128



##########
File path: extensions/sql/patch/iodbc.patch
##########
@@ -0,0 +1,12 @@
+diff -rupN orig/configure.ac patched/configure.ac
+--- orig/configure.ac	2019-07-23 13:37:26.000000000 +0200
++++ patched/configure.ac	2021-02-15 12:43:35.000000000 +0100
+@@ -398,7 +398,7 @@ if test x"$libltdl_cv_uscore" = xyes; th
+ #include <dlfcn.h>
+ #endif
+ 
+-#include <stdio.h>
++#include <stdlib.h>

Review comment:
       there is a new release available, I'll check if it's still needed and if the upgrade is viable




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r600240781



##########
File path: extensions/sql/data/DatabaseConnectors.h
##########
@@ -45,8 +45,12 @@ class Statement {
 
   virtual ~Statement() = default;
 
-  soci::rowset<soci::row> execute() {
-    return session_.prepare << query_;
+  soci::rowset<soci::row> execute(const std::vector<std::string>& args = {}) {
+    auto stmt = session_.prepare << query_;
+    for (auto& arg : args) {
+      stmt, soci::use(arg);

Review comment:
       it seems to me it is, it binds the arguments of a prepared statement like `"SELECT * FROM table WHERE id = ?".bind(4)` but the interface is unfortunate, added explicit comma operator call and a comment explaining what is happening

##########
File path: extensions/sql/data/MaxCollector.h
##########
@@ -31,26 +33,21 @@ namespace minifi {
 namespace sql {
 
 class MaxCollector: public SQLRowSubscriber {
-  void beginProcessRow() override {}
-
-  void endProcessRow() override {
-    if (columnsVerified_) {
-      return;
+  void beginProcessBatch() override {}
+  void endProcessBatch(Progress progress) override {
+    if (progress == Progress::DONE) {
+      updateMapState();
     }
-
-    if (countColumns_ != mapState_.size())
-      throw minifi::Exception(PROCESSOR_EXCEPTION, "MaxCollector: Column(s) '" + maxValueColumnNames_ + "' are not found in the columns of '" + selectQuery_ + "' result.");
-
-    columnsVerified_ = true;
   }
+  void beginProcessRow() override {}
+  void endProcessRow() override {}
 
-  void processColumnName(const std::string& name) override {
-    if (columnsVerified_) {
-      return;
-    }
-
-    if (mapState_.count(name)) {
-      countColumns_++;
+  void processColumnNames(const std::vector<std::string>& names) override {
+    for (auto& expected : state_) {

Review comment:
       done

##########
File path: extensions/sql/data/SQLRowsetProcessor.cpp
##########
@@ -20,51 +20,61 @@
 
 #include "Exception.h"
 #include "Utils.h"
+#include "utils/StringUtils.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace sql {
 
-SQLRowsetProcessor::SQLRowsetProcessor(const soci::rowset<soci::row>& rowset, const std::vector<SQLRowSubscriber*>& rowSubscribers)
-  : rowset_(rowset), rowSubscribers_(rowSubscribers) {
+SQLRowsetProcessor::SQLRowsetProcessor(const soci::rowset<soci::row>& rowset, std::vector<std::reference_wrapper<SQLRowSubscriber>> row_subscribers)
+  : rowset_(rowset), row_subscribers_(std::move(row_subscribers)) {
   iter_ = rowset_.begin();
 }
 
 size_t SQLRowsetProcessor::process(size_t max) {
   size_t count = 0;
 
+  for (const auto& subscriber : row_subscribers_) {
+    subscriber.get().beginProcessBatch();
+  }
+
   for (; iter_ != rowset_.end(); ) {
     addRow(*iter_, count);
     iter_++;
     count++;
-    totalCount_++;
     if (max > 0 && count >= max) {
       break;
     }
   }
 
+  for (const auto& subscriber : row_subscribers_) {
+    subscriber.get().endProcessBatch(count == 0 ? SQLRowSubscriber::Progress::DONE : SQLRowSubscriber::Progress::CONTINUE);
+  }
+
   return count;
 }
 
 void SQLRowsetProcessor::addRow(const soci::row& row, size_t rowCount) {
-  for (const auto& pRowSubscriber : rowSubscribers_) {
-    pRowSubscriber->beginProcessRow();
+  for (const auto& subscriber : row_subscribers_) {
+    subscriber.get().beginProcessRow();
   }
 
   if (rowCount == 0) {
+    std::vector<std::string> column_names;

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603236080



##########
File path: libminifi/test/sql-tests/SQLTestPlan.h
##########
@@ -0,0 +1,98 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "../TestBase.h"
+
+class SQLTestPlan {
+ public:
+  SQLTestPlan(TestController& controller, const std::string& connection_str, const std::string& sql_processor, std::initializer_list<core::Relationship> output_rels) {
+    plan_ = controller.createPlan();
+    processor_ = plan_->addProcessor(sql_processor, sql_processor);
+    plan_->setProperty(processor_, "DB Controller Service", "ODBCService");
+    input_ = plan_->addConnection({}, {"success", "d"}, processor_);
+    for (const auto& output_rel : output_rels) {
+      outputs_[output_rel] = plan_->addConnection(processor_, output_rel, {});
+    }
+
+    // initialize database service
+    auto service = plan_->addController("ODBCService", "ODBCService");
+    plan_->setProperty(service, minifi::sql::controllers::DatabaseService::ConnectionString.getName(), connection_str);
+  }
+
+  std::string getContent(const std::shared_ptr<core::FlowFile>& flow_file) {
+    return plan_->getContent(flow_file);
+  }
+
+  std::shared_ptr<core::FlowFile> addInput(std::initializer_list<std::pair<std::string, std::string>> attributes = {}, const utils::optional<std::string>& content = {}) {
+    auto flow_file = std::make_shared<minifi::FlowFileRecord>();
+    for (const auto& attr : attributes) {
+      flow_file->setAttribute(attr.first, attr.second);
+    }
+    if (content) {
+      auto claim = std::make_shared<minifi::ResourceClaim>(plan_->getContentRepo());
+      auto content_stream = plan_->getContentRepo()->write(*claim);
+      int ret = content_stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(content->c_str())), content->length());
+      REQUIRE(ret == content->length());
+      flow_file->setOffset(0);
+      flow_file->setSize(content->length());
+      flow_file->setResourceClaim(claim);
+    }
+    input_->put(flow_file);
+    return flow_file;
+  }
+
+  std::shared_ptr<core::Processor> getSQLProcessor() {
+    return processor_;
+  }
+
+  void run(bool reschedule = false) {
+    if (reschedule) {
+      plan_->reset(reschedule);
+    }
+    plan_->runProcessor(0);  // run the one and only sql processor
+  }
+
+  std::map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> getAllOutputs() {
+    std::map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> flow_file_map;
+    for (const auto& output : outputs_) {
+      flow_file_map[output.first] = getOutputs(output.first);
+    }
+    return flow_file_map;
+  }
+
+  std::vector<std::shared_ptr<core::FlowFile>> getOutputs(const core::Relationship& relationship) {
+    auto conn = outputs_[relationship];
+    REQUIRE(conn);
+    std::vector<std::shared_ptr<core::FlowFile>> flow_files;
+    std::set<std::shared_ptr<core::FlowFile>> expired;
+    while (auto flow_file = conn->poll(expired)) {
+      REQUIRE(expired.empty());
+      flow_files.push_back(std::move(flow_file));
+    }
+    REQUIRE(expired.empty());

Review comment:
       no, that's fine, I didn't think of that case




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603052262



##########
File path: extensions/sql/processors/QueryDatabaseTable.cpp
##########
@@ -75,361 +72,233 @@ const core::Property QueryDatabaseTable::s_maxValueColumnNames(
     "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
     "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
     "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
-    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
-    supportsExpressionLanguage(true)->build());
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_whereClause(
-  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
-    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+const core::Property QueryDatabaseTable::WhereClause(
+  core::PropertyBuilder::createProperty("Where Clause")
+  ->isRequired(false)
+  ->withDescription("A custom clause to be added in the WHERE condition when building SQL queries.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_sqlQuery(
-  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
-    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
-    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+const std::string QueryDatabaseTable::InitialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
 
-const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
-  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
-    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
-    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+const core::Relationship QueryDatabaseTable::Success("success", "Successfully created FlowFile from SQL query result set.");
 
-const core::Property QueryDatabaseTable::s_stateDirectory(
-  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.")->build());
+const std::string QueryDatabaseTable::RESULT_TABLE_NAME = "tablename";
+const std::string QueryDatabaseTable::RESULT_ROW_COUNT = "querydbtable.row.count";
 
-const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
-
-const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
-
-static const std::string ResultTableName = "tablename";
-static const std::string ResultRowCount = "querydbtable.row.count";
-
-static const std::string TABLENAME_KEY = "tablename";
-static const std::string MAXVALUE_KEY_PREFIX = "maxvalue.";
-
-// State
-class LegacyState {
- public:
-  LegacyState(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
-    :tableName_(tableName), logger_(logger) {
-
-    filePath_ = utils::file::FileUtils::concat_path(
-            utils::file::FileUtils::concat_path(
-              utils::file::FileUtils::concat_path(stateDir, "uuid"), uuid), "State.txt");
-
-    if (!getStateFromFile())
-      return;
-
-    ok_ = true;
-  }
-
-  explicit operator bool() const {
-    return ok_;
-  }
-
-  const std::unordered_map<std::string, std::string>& getStateMap() const {
-    return mapState_;
-  }
-
-  bool moveStateFileToMigrated() {
-    if (!ok_) {
-      return false;
-    }
-    return rename(filePath_.c_str(), (filePath_ + "-migrated").c_str()) == 0;
-  }
-
- private:
-  static const std::string separator_;
-
-   bool getStateFromFile() {
-     std::string state;
-
-     std::ifstream file(filePath_);
-     if (!file) {
-       return false;
-     }
-
-     std::stringstream ss;
-     ss << file.rdbuf();
-
-     state = ss.str();
-
-     file.close();
-
-     std::vector<std::string> listColumnNameValue;
-
-     size_t pos = state.find(separator_, 0);
-     if (pos == std::string::npos) {
-       logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-       mapState_.clear();
-       return false;
-     }
-
-     auto tableName = state.substr(0, pos);
-     if (tableName != tableName_) {
-       logger_->log_warn("tableName is changed - now: '%s', in State.txt: '%s'.", tableName_.c_str(), tableName.c_str());
-       mapState_.clear();
-
-       return false;
-     }
-
-     pos += separator_.size();
-
-     while (true) {
-       auto newPos = state.find(separator_, pos);
-       if (newPos == std::string::npos)
-         break;
-
-       const std::string& columnNameValue = state.substr(pos, newPos - pos);
-       listColumnNameValue.emplace_back(columnNameValue);
-
-       pos = newPos + separator_.size();
-     }
-
-     for (const auto& columnNameValue : listColumnNameValue) {
-       const auto posEQ = columnNameValue.find('=');
-       if (posEQ == std::string::npos) {
-         logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-         mapState_.clear();
-         return false;
-       }
-
-       const auto& name = columnNameValue.substr(0, posEQ);
-       const auto& value = columnNameValue.substr(posEQ + 1);
-
-       mapState_.insert({ name, value });
-     }
-
-     return true;
-   }
-
- private:
-   std::unordered_map<std::string, std::string> mapState_;
-   std::string filePath_;
-   std::string tableName_;
-   std::shared_ptr<logging::Logger> logger_;
-   bool ok_{};
-};
-
-const std::string LegacyState::separator_ = "@!qdt!@";
+const std::string QueryDatabaseTable::TABLENAME_KEY = "tablename";
+const std::string QueryDatabaseTable::MAXVALUE_KEY_PREFIX = "maxvalue.";
 
 // QueryDatabaseTable
 QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid) {
+  : SQLProcessor(name, uuid, logging::LoggerFactory<QueryDatabaseTable>::getLogger()) {
 }
 
-QueryDatabaseTable::~QueryDatabaseTable() = default;
-
 void QueryDatabaseTable::initialize() {
   //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), outputFormat(), s_tableName, s_columnNames, s_maxValueColumnNames, s_whereClause, s_sqlQuery, s_maxRowsPerFlowFile, s_stateDirectory});
+  setSupportedProperties({
+    DBControllerService, OutputFormat, TableName, ColumnNames,
+    MaxValueColumnNames, WhereClause, MaxRowsPerFlowFile});
 
   //! Set the supported relationships
-  setSupportedRelationships({ s_success });
+  setSupportedRelationships({ Success });
 }
 
-void QueryDatabaseTable::processOnSchedule(core::ProcessContext &context) {
-  initOutputFormat(context);
+void QueryDatabaseTable::processOnSchedule(core::ProcessContext& context) {
+  context.getProperty(OutputFormat.getName(), output_format_);
+  max_rows_ = [&] {
+    uint64_t max_rows;
+    context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
+    return gsl::narrow<size_t>(max_rows);
+  }();
 
-  context.getProperty(s_tableName.getName(), tableName_);
-  context.getProperty(s_columnNames.getName(), columnNames_);
+  state_manager_ = context.getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+
+  context.getProperty(TableName.getName(), table_name_);
+  context.getProperty(WhereClause.getName(), extra_where_clause_);
+  max_value_columns_ = [&] {
+    std::string max_value_columns_str;
+    context.getProperty(MaxValueColumnNames.getName(), max_value_columns_str);
+    return utils::inputStringToList(max_value_columns_str);
+  }();
+  return_columns_ = [&] {
+    std::string return_columns_str;
+    context.getProperty(ColumnNames.getName(), return_columns_str);
+    return utils::inputStringToList(return_columns_str);
+  }();
+  queried_columns_ = utils::StringUtils::join(", ", return_columns_);
+  if (!queried_columns_.empty() && !max_value_columns_.empty()) {
+    // columns will be explicitly enumerated, we need to add the max value columns
+    queried_columns_ = queried_columns_ + ", " + utils::StringUtils::join(", ", max_value_columns_);
+  }
 
-  context.getProperty(s_maxValueColumnNames.getName(), maxValueColumnNames_);
-  listMaxValueColumnName_ = utils::inputStringToList(maxValueColumnNames_);
+  initializeMaxValues(context);
+}
 
-  context.getProperty(s_whereClause.getName(), whereClause_);
-  context.getProperty(s_sqlQuery.getName(), sqlQuery_);
-  context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_);
+void QueryDatabaseTable::processOnTrigger(core::ProcessContext& /*context*/, core::ProcessSession& session) {
+  const auto& selectQuery = buildSelectQuery();
 
-  mapState_.clear();
+  logger_->log_info("QueryDatabaseTable: selectQuery: '%s'", selectQuery.c_str());
 
-  state_manager_ = context.getStateManager();
-  if (state_manager_ == nullptr) {
-    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  auto statement = connection_->prepareStatement(selectQuery);
+
+  auto rowset = statement->execute();
+
+  std::unordered_map<std::string, std::string> new_max_values = max_values_;
+  sql::MaxCollector maxCollector{selectQuery, new_max_values};
+  auto column_filter = [&] (const std::string& column_name) {
+    return return_columns_.empty()
+      || std::find(return_columns_.begin(), return_columns_.end(), column_name) != return_columns_.end();
+  };
+  sql::JSONSQLWriter sqlWriter{output_format_ == OutputType::JSONPretty, column_filter};

Review comment:
       done

##########
File path: extensions/sql/processors/QueryDatabaseTable.cpp
##########
@@ -75,361 +72,233 @@ const core::Property QueryDatabaseTable::s_maxValueColumnNames(
     "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
     "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
     "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
-    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
-    supportsExpressionLanguage(true)->build());
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_whereClause(
-  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
-    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+const core::Property QueryDatabaseTable::WhereClause(
+  core::PropertyBuilder::createProperty("Where Clause")
+  ->isRequired(false)
+  ->withDescription("A custom clause to be added in the WHERE condition when building SQL queries.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_sqlQuery(
-  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
-    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
-    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+const std::string QueryDatabaseTable::InitialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
 
-const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
-  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
-    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
-    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+const core::Relationship QueryDatabaseTable::Success("success", "Successfully created FlowFile from SQL query result set.");
 
-const core::Property QueryDatabaseTable::s_stateDirectory(
-  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.")->build());
+const std::string QueryDatabaseTable::RESULT_TABLE_NAME = "tablename";
+const std::string QueryDatabaseTable::RESULT_ROW_COUNT = "querydbtable.row.count";
 
-const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
-
-const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
-
-static const std::string ResultTableName = "tablename";
-static const std::string ResultRowCount = "querydbtable.row.count";
-
-static const std::string TABLENAME_KEY = "tablename";
-static const std::string MAXVALUE_KEY_PREFIX = "maxvalue.";
-
-// State
-class LegacyState {
- public:
-  LegacyState(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
-    :tableName_(tableName), logger_(logger) {
-
-    filePath_ = utils::file::FileUtils::concat_path(
-            utils::file::FileUtils::concat_path(
-              utils::file::FileUtils::concat_path(stateDir, "uuid"), uuid), "State.txt");
-
-    if (!getStateFromFile())
-      return;
-
-    ok_ = true;
-  }
-
-  explicit operator bool() const {
-    return ok_;
-  }
-
-  const std::unordered_map<std::string, std::string>& getStateMap() const {
-    return mapState_;
-  }
-
-  bool moveStateFileToMigrated() {
-    if (!ok_) {
-      return false;
-    }
-    return rename(filePath_.c_str(), (filePath_ + "-migrated").c_str()) == 0;
-  }
-
- private:
-  static const std::string separator_;
-
-   bool getStateFromFile() {
-     std::string state;
-
-     std::ifstream file(filePath_);
-     if (!file) {
-       return false;
-     }
-
-     std::stringstream ss;
-     ss << file.rdbuf();
-
-     state = ss.str();
-
-     file.close();
-
-     std::vector<std::string> listColumnNameValue;
-
-     size_t pos = state.find(separator_, 0);
-     if (pos == std::string::npos) {
-       logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-       mapState_.clear();
-       return false;
-     }
-
-     auto tableName = state.substr(0, pos);
-     if (tableName != tableName_) {
-       logger_->log_warn("tableName is changed - now: '%s', in State.txt: '%s'.", tableName_.c_str(), tableName.c_str());
-       mapState_.clear();
-
-       return false;
-     }
-
-     pos += separator_.size();
-
-     while (true) {
-       auto newPos = state.find(separator_, pos);
-       if (newPos == std::string::npos)
-         break;
-
-       const std::string& columnNameValue = state.substr(pos, newPos - pos);
-       listColumnNameValue.emplace_back(columnNameValue);
-
-       pos = newPos + separator_.size();
-     }
-
-     for (const auto& columnNameValue : listColumnNameValue) {
-       const auto posEQ = columnNameValue.find('=');
-       if (posEQ == std::string::npos) {
-         logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-         mapState_.clear();
-         return false;
-       }
-
-       const auto& name = columnNameValue.substr(0, posEQ);
-       const auto& value = columnNameValue.substr(posEQ + 1);
-
-       mapState_.insert({ name, value });
-     }
-
-     return true;
-   }
-
- private:
-   std::unordered_map<std::string, std::string> mapState_;
-   std::string filePath_;
-   std::string tableName_;
-   std::shared_ptr<logging::Logger> logger_;
-   bool ok_{};
-};
-
-const std::string LegacyState::separator_ = "@!qdt!@";
+const std::string QueryDatabaseTable::TABLENAME_KEY = "tablename";
+const std::string QueryDatabaseTable::MAXVALUE_KEY_PREFIX = "maxvalue.";
 
 // QueryDatabaseTable
 QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid) {
+  : SQLProcessor(name, uuid, logging::LoggerFactory<QueryDatabaseTable>::getLogger()) {
 }
 
-QueryDatabaseTable::~QueryDatabaseTable() = default;
-
 void QueryDatabaseTable::initialize() {
   //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), outputFormat(), s_tableName, s_columnNames, s_maxValueColumnNames, s_whereClause, s_sqlQuery, s_maxRowsPerFlowFile, s_stateDirectory});
+  setSupportedProperties({
+    DBControllerService, OutputFormat, TableName, ColumnNames,
+    MaxValueColumnNames, WhereClause, MaxRowsPerFlowFile});
 
   //! Set the supported relationships
-  setSupportedRelationships({ s_success });
+  setSupportedRelationships({ Success });
 }
 
-void QueryDatabaseTable::processOnSchedule(core::ProcessContext &context) {
-  initOutputFormat(context);
+void QueryDatabaseTable::processOnSchedule(core::ProcessContext& context) {
+  context.getProperty(OutputFormat.getName(), output_format_);
+  max_rows_ = [&] {
+    uint64_t max_rows;
+    context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
+    return gsl::narrow<size_t>(max_rows);
+  }();
 
-  context.getProperty(s_tableName.getName(), tableName_);
-  context.getProperty(s_columnNames.getName(), columnNames_);
+  state_manager_ = context.getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+
+  context.getProperty(TableName.getName(), table_name_);
+  context.getProperty(WhereClause.getName(), extra_where_clause_);
+  max_value_columns_ = [&] {
+    std::string max_value_columns_str;
+    context.getProperty(MaxValueColumnNames.getName(), max_value_columns_str);
+    return utils::inputStringToList(max_value_columns_str);
+  }();
+  return_columns_ = [&] {
+    std::string return_columns_str;
+    context.getProperty(ColumnNames.getName(), return_columns_str);
+    return utils::inputStringToList(return_columns_str);
+  }();
+  queried_columns_ = utils::StringUtils::join(", ", return_columns_);
+  if (!queried_columns_.empty() && !max_value_columns_.empty()) {
+    // columns will be explicitly enumerated, we need to add the max value columns
+    queried_columns_ = queried_columns_ + ", " + utils::StringUtils::join(", ", max_value_columns_);
+  }
 
-  context.getProperty(s_maxValueColumnNames.getName(), maxValueColumnNames_);
-  listMaxValueColumnName_ = utils::inputStringToList(maxValueColumnNames_);
+  initializeMaxValues(context);
+}
 
-  context.getProperty(s_whereClause.getName(), whereClause_);
-  context.getProperty(s_sqlQuery.getName(), sqlQuery_);
-  context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_);
+void QueryDatabaseTable::processOnTrigger(core::ProcessContext& /*context*/, core::ProcessSession& session) {
+  const auto& selectQuery = buildSelectQuery();
 
-  mapState_.clear();
+  logger_->log_info("QueryDatabaseTable: selectQuery: '%s'", selectQuery.c_str());
 
-  state_manager_ = context.getStateManager();
-  if (state_manager_ == nullptr) {
-    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  auto statement = connection_->prepareStatement(selectQuery);
+
+  auto rowset = statement->execute();
+
+  std::unordered_map<std::string, std::string> new_max_values = max_values_;
+  sql::MaxCollector maxCollector{selectQuery, new_max_values};
+  auto column_filter = [&] (const std::string& column_name) {
+    return return_columns_.empty()
+      || std::find(return_columns_.begin(), return_columns_.end(), column_name) != return_columns_.end();
+  };
+  sql::JSONSQLWriter sqlWriter{output_format_ == OutputType::JSONPretty, column_filter};
+  FlowFileGenerator flow_file_creator{session, sqlWriter};
+  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, {sqlWriter, maxCollector, flow_file_creator});
+
+  while (size_t row_count = sqlRowsetProcessor.process(max_rows_)) {
+    auto new_file = flow_file_creator.getLastFlowFile();
+    new_file->addAttribute(RESULT_ROW_COUNT, std::to_string(row_count));

Review comment:
       done

##########
File path: extensions/sql/processors/SQLProcessor.cpp
##########
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SQLProcessor.h"
+
+#include <vector>
+#include <memory>
+
+#include "core/FlowFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+
+#include <soci/error.h>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const core::Property SQLProcessor::DBControllerService(
+    core::PropertyBuilder::createProperty("DB Controller Service")
+    ->isRequired(true)
+    ->withDescription("Database Controller Service.")
+    ->supportsExpressionLanguage(true)->build());
+
+void SQLProcessor::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
+  std::string controllerService;
+  context->getProperty(DBControllerService.getName(), controllerService);
+
+  db_service_ = std::dynamic_pointer_cast<sql::controllers::DatabaseService>(context->getControllerService(controllerService));
+  if (!db_service_) {
+    throw minifi::Exception(PROCESSOR_EXCEPTION, "'DB Controller Service' must be defined");

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603838206



##########
File path: .github/workflows/ci.yml
##########
@@ -132,12 +142,13 @@ jobs:
         run: |
           sudo apt-add-repository -y "ppa:ubuntu-toolchain-r/test"
           sudo apt update
-          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev
+          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so

Review comment:
       the binary will be working fine, the symlink is only (apparently) needed on ubuntu for the ODBCService iff we don't want to employ an odbc.ini file, and without an odbc.ini file the connection string is inherently platform specific anyway




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603909790



##########
File path: .github/workflows/ci.yml
##########
@@ -132,12 +142,13 @@ jobs:
         run: |
           sudo apt-add-repository -y "ppa:ubuntu-toolchain-r/test"
           sudo apt update
-          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev
+          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so

Review comment:
       Without the symlink, my agent starts up, but I get this error in PutSQL::onTrigger (or the `onTrigger` of one of the other SQL processors):
   ```
   [org::apache::nifi::minifi::processors::PutSQL] [error] SQLProcessor: 'Error connecting to database: [iODBC][Driver Manager]libsqlite3odbc.so: cannot open shared object file: No such file or directory (SQL state 00000)'
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r601566935



##########
File path: extensions/sql/patch/iodbc.patch
##########
@@ -0,0 +1,12 @@
+diff -rupN orig/configure.ac patched/configure.ac
+--- orig/configure.ac	2019-07-23 13:37:26.000000000 +0200
++++ patched/configure.ac	2021-02-15 12:43:35.000000000 +0100
+@@ -398,7 +398,7 @@ if test x"$libltdl_cv_uscore" = xyes; th
+ #include <dlfcn.h>
+ #endif
+ 
+-#include <stdio.h>
++#include <stdlib.h>

Review comment:
       the upgrade seems to have been successful, so no need for the patches




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r601476215



##########
File path: extensions/sql/processors/FlowFileSource.h
##########
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+#include <memory>
+
+#include "core/Property.h"
+#include "utils/Enum.h"
+#include "data/SQLRowsetProcessor.h"
+#include "ProcessSession.h"
+#include "data/JSONSQLWriter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class FlowFileSource {
+ public:
+  static const std::string FRAGMENT_IDENTIFIER;
+  static const std::string FRAGMENT_COUNT;
+  static const std::string FRAGMENT_INDEX;
+
+  static const core::Property OutputFormat;
+  static const core::Property MaxRowsPerFlowFile;
+
+  SMART_ENUM(OutputType,
+    (JSON, "JSON"),
+    (JSONPretty, "JSON-Pretty")
+  )
+
+ protected:
+  class FlowFileGenerator : public sql::SQLRowSubscriber {
+   public:
+    FlowFileGenerator(core::ProcessSession& session, sql::JSONSQLWriter& json_writer)
+      : session_(session),
+        json_writer_(json_writer) {}
+
+    void beginProcessBatch() override {
+      current_batch_size_ = 0;
+    }
+    void endProcessBatch(Progress progress) override;
+    void beginProcessRow() override {}
+    void endProcessRow() override {
+      ++current_batch_size_;
+    }
+    void processColumnNames(const std::vector<std::string>& /*names*/) override {}
+    void processColumn(const std::string& /*name*/, const std::string& /*value*/) override {}
+    void processColumn(const std::string& /*name*/, double /*value*/) override {}
+    void processColumn(const std::string& /*name*/, int /*value*/) override {}
+    void processColumn(const std::string& /*name*/, long long /*value*/) override {}
+    void processColumn(const std::string& /*name*/, unsigned long long /*value*/) override {}
+    void processColumn(const std::string& /*name*/, const char* /*value*/) override {}
+
+    std::shared_ptr<core::FlowFile> getLastFlowFile() const {
+      if (!flow_files_.empty()) {
+        return flow_files_.back();
+      }
+      return {};
+    }
+
+    std::vector<std::shared_ptr<core::FlowFile>>& getFlowFiles() {
+      return flow_files_;
+    }
+
+   private:
+    core::ProcessSession& session_;
+    sql::JSONSQLWriter& json_writer_;
+    const utils::Identifier batch_id_{utils::IdGenerator::getIdGenerator()->generate()};
+    size_t current_batch_size_{0};

Review comment:
       with the introduction of `finishProcessing` it is now being used to check if the batch contained no rows




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r601357523



##########
File path: extensions/sql/patch/iodbc.patch
##########
@@ -0,0 +1,12 @@
+diff -rupN orig/configure.ac patched/configure.ac
+--- orig/configure.ac	2019-07-23 13:37:26.000000000 +0200
++++ patched/configure.ac	2021-02-15 12:43:35.000000000 +0100
+@@ -398,7 +398,7 @@ if test x"$libltdl_cv_uscore" = xyes; th
+ #include <dlfcn.h>
+ #endif
+ 
+-#include <stdio.h>
++#include <stdlib.h>

Review comment:
       can you add a link somewhere to the issue or PR related to this patch, so we'll know when we can remove it?

##########
File path: extensions/sql/processors/ExecuteSQL.cpp
##########
@@ -51,72 +41,87 @@ namespace processors {
 
 const std::string ExecuteSQL::ProcessorName("ExecuteSQL");
 
-const core::Property ExecuteSQL::s_sqlSelectQuery(
-  core::PropertyBuilder::createProperty("SQL select query")->isRequired(true)->withDescription(
+const core::Property ExecuteSQL::SQLSelectQuery(
+  core::PropertyBuilder::createProperty("SQL select query")
+  ->withDescription(
     "The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. "
     "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
     "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database. "
-    "Note that Expression Language is not evaluated for flow file contents.")->supportsExpressionLanguage(true)->build());
+    "Note that Expression Language is not evaluated for flow file contents.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property ExecuteSQL::s_maxRowsPerFlowFile(
-  core::PropertyBuilder::createProperty("Max Rows Per Flow File")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
-    "The maximum number of result rows that will be included intoi a flow file. If zero then all will be placed into the flow file")->supportsExpressionLanguage(true)->build());
+const core::Relationship ExecuteSQL::Success("success", "Successfully created FlowFile from SQL query result set.");
 
-const core::Relationship ExecuteSQL::s_success("success", "Successfully created FlowFile from SQL query result set.");
-
-static const std::string ResultRowCount = "executesql.row.count";
+const std::string ExecuteSQL::RESULT_ROW_COUNT = "executesql.row.count";
+const std::string ExecuteSQL::INPUT_FLOW_FILE_UUID = "input.flowfile.uuid";
 
 ExecuteSQL::ExecuteSQL(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid), max_rows_(0) {
+  : SQLProcessor(name, uuid, logging::LoggerFactory<ExecuteSQL>::getLogger()) {
 }
 
-ExecuteSQL::~ExecuteSQL() = default;
-
 void ExecuteSQL::initialize() {
   //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), outputFormat(), s_sqlSelectQuery, s_maxRowsPerFlowFile});
+  setSupportedProperties({ DBControllerService, OutputFormat, SQLSelectQuery, MaxRowsPerFlowFile});
 
   //! Set the supported relationships
-  setSupportedRelationships({ s_success });
+  setSupportedRelationships({ Success });
 }
 
-void ExecuteSQL::processOnSchedule(core::ProcessContext &context) {
-  initOutputFormat(context);
+void ExecuteSQL::processOnSchedule(core::ProcessContext& context) {
+  context.getProperty(OutputFormat.getName(), output_format_);
 
-  context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
-  context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
+  max_rows_ = [&] {
+    uint64_t max_rows;
+    context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
+    return gsl::narrow<size_t>(max_rows);
+  }();
 }
 
-void ExecuteSQL::processOnTrigger(core::ProcessSession& session) {
-  auto statement = connection_->prepareStatement(sqlSelectQuery_);
+void ExecuteSQL::processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) {
+  auto input_flow_file = session.get();
 
-  auto rowset = statement->execute();
-
-  int count = 0;
-  size_t rowCount = 0;
-  sql::JSONSQLWriter sqlWriter(isJSONPretty());
-  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, { &sqlWriter });
+  std::string query;
+  if (!context.getProperty(SQLSelectQuery, query, input_flow_file)) {
+    if (!input_flow_file) {
+      throw Exception(PROCESSOR_EXCEPTION,
+                      "No incoming FlowFile and the \"SQL select query\" processor property is not specified");

Review comment:
       minor, but using `SQLSelectQuery.getName()` would be better

##########
File path: extensions/sql/processors/FlowFileSource.cpp
##########
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FlowFileSource.h"
+
+#include "FlowFile.h"
+#include "data/JSONSQLWriter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const core::Property FlowFileSource::OutputFormat(
+  core::PropertyBuilder::createProperty("Output Format")
+  ->isRequired(true)
+  ->supportsExpressionLanguage(true)
+  ->withDefaultValue(toString(OutputType::JSONPretty))
+  ->withAllowableValues<std::string>(OutputType::values())
+  ->withDescription("Set the output format type.")->build());
+
+const core::Property FlowFileSource::MaxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("Max Rows Per Flow File")
+  ->isRequired(true)
+  ->supportsExpressionLanguage(true)
+  ->withDefaultValue<uint64_t>(0)
+  ->withDescription(
+      "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+      "If the value specified is zero, then all rows are returned in a single FlowFile.")->build());
+
+const std::string FlowFileSource::FRAGMENT_IDENTIFIER = "fragment.identifier";
+const std::string FlowFileSource::FRAGMENT_COUNT = "fragment.count";
+const std::string FlowFileSource::FRAGMENT_INDEX = "fragment.index";
+
+void FlowFileSource::FlowFileGenerator::endProcessBatch(Progress progress) {
+  if (progress == Progress::DONE) {
+    // annotate the flow files with the fragment.count
+    std::string fragment_count = std::to_string(flow_files_.size());
+    for (const auto& flow_file : flow_files_) {
+      flow_file->addAttribute(FRAGMENT_COUNT, fragment_count);
+    }
+    return;
+  }

Review comment:
       It looks like `endProcessBatch(CONTINUE)` and `endProcessBatch(DONE)` are completely different functions with no common code in any of the implementations.  It might be simpler if they were separate virtual functions, e.g. `endProcessBatch()` and `finishProcessing()`.

##########
File path: extensions/sql/processors/ExecuteSQL.cpp
##########
@@ -51,72 +41,87 @@ namespace processors {
 
 const std::string ExecuteSQL::ProcessorName("ExecuteSQL");
 
-const core::Property ExecuteSQL::s_sqlSelectQuery(
-  core::PropertyBuilder::createProperty("SQL select query")->isRequired(true)->withDescription(
+const core::Property ExecuteSQL::SQLSelectQuery(
+  core::PropertyBuilder::createProperty("SQL select query")
+  ->withDescription(
     "The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. "
     "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
     "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database. "
-    "Note that Expression Language is not evaluated for flow file contents.")->supportsExpressionLanguage(true)->build());
+    "Note that Expression Language is not evaluated for flow file contents.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property ExecuteSQL::s_maxRowsPerFlowFile(
-  core::PropertyBuilder::createProperty("Max Rows Per Flow File")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
-    "The maximum number of result rows that will be included intoi a flow file. If zero then all will be placed into the flow file")->supportsExpressionLanguage(true)->build());
+const core::Relationship ExecuteSQL::Success("success", "Successfully created FlowFile from SQL query result set.");
 
-const core::Relationship ExecuteSQL::s_success("success", "Successfully created FlowFile from SQL query result set.");
-
-static const std::string ResultRowCount = "executesql.row.count";
+const std::string ExecuteSQL::RESULT_ROW_COUNT = "executesql.row.count";
+const std::string ExecuteSQL::INPUT_FLOW_FILE_UUID = "input.flowfile.uuid";
 
 ExecuteSQL::ExecuteSQL(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid), max_rows_(0) {
+  : SQLProcessor(name, uuid, logging::LoggerFactory<ExecuteSQL>::getLogger()) {
 }
 
-ExecuteSQL::~ExecuteSQL() = default;
-
 void ExecuteSQL::initialize() {
   //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), outputFormat(), s_sqlSelectQuery, s_maxRowsPerFlowFile});
+  setSupportedProperties({ DBControllerService, OutputFormat, SQLSelectQuery, MaxRowsPerFlowFile});
 
   //! Set the supported relationships
-  setSupportedRelationships({ s_success });
+  setSupportedRelationships({ Success });
 }
 
-void ExecuteSQL::processOnSchedule(core::ProcessContext &context) {
-  initOutputFormat(context);
+void ExecuteSQL::processOnSchedule(core::ProcessContext& context) {
+  context.getProperty(OutputFormat.getName(), output_format_);
 
-  context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
-  context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
+  max_rows_ = [&] {
+    uint64_t max_rows;
+    context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
+    return gsl::narrow<size_t>(max_rows);
+  }();
 }
 
-void ExecuteSQL::processOnTrigger(core::ProcessSession& session) {
-  auto statement = connection_->prepareStatement(sqlSelectQuery_);
+void ExecuteSQL::processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) {
+  auto input_flow_file = session.get();
 
-  auto rowset = statement->execute();
-
-  int count = 0;
-  size_t rowCount = 0;
-  sql::JSONSQLWriter sqlWriter(isJSONPretty());
-  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, { &sqlWriter });
+  std::string query;
+  if (!context.getProperty(SQLSelectQuery, query, input_flow_file)) {
+    if (!input_flow_file) {
+      throw Exception(PROCESSOR_EXCEPTION,
+                      "No incoming FlowFile and the \"SQL select query\" processor property is not specified");
+    }
+    logger_->log_debug("Using the contents of the flow file as the SQL statement");
+    auto buffer = std::make_shared<io::BufferStream>();
+    InputStreamPipe content_reader{buffer};
+    session.read(input_flow_file, &content_reader);
+    query = std::string{reinterpret_cast<const char *>(buffer->getBuffer()), buffer->size()};
+  }
+  if (query.empty()) {
+    throw Exception(PROCESSOR_EXCEPTION, "Empty SQL statement");
+  }
+
+  auto row_set = connection_->prepareStatement(query)->execute(collectArguments(input_flow_file));
+
+  sql::JSONSQLWriter sqlWriter{output_format_ == OutputType::JSONPretty};

Review comment:
       very minor, but I would call this `json_writer`, as it writes JSON, not SQL

##########
File path: extensions/sql/processors/FlowFileSource.h
##########
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+#include <memory>
+
+#include "core/Property.h"
+#include "utils/Enum.h"
+#include "data/SQLRowsetProcessor.h"
+#include "ProcessSession.h"
+#include "data/JSONSQLWriter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class FlowFileSource {
+ public:
+  static const std::string FRAGMENT_IDENTIFIER;
+  static const std::string FRAGMENT_COUNT;
+  static const std::string FRAGMENT_INDEX;
+
+  static const core::Property OutputFormat;
+  static const core::Property MaxRowsPerFlowFile;
+
+  SMART_ENUM(OutputType,
+    (JSON, "JSON"),
+    (JSONPretty, "JSON-Pretty")
+  )
+
+ protected:
+  class FlowFileGenerator : public sql::SQLRowSubscriber {
+   public:
+    FlowFileGenerator(core::ProcessSession& session, sql::JSONSQLWriter& json_writer)
+      : session_(session),
+        json_writer_(json_writer) {}
+
+    void beginProcessBatch() override {
+      current_batch_size_ = 0;
+    }
+    void endProcessBatch(Progress progress) override;
+    void beginProcessRow() override {}
+    void endProcessRow() override {
+      ++current_batch_size_;
+    }
+    void processColumnNames(const std::vector<std::string>& /*names*/) override {}
+    void processColumn(const std::string& /*name*/, const std::string& /*value*/) override {}
+    void processColumn(const std::string& /*name*/, double /*value*/) override {}
+    void processColumn(const std::string& /*name*/, int /*value*/) override {}
+    void processColumn(const std::string& /*name*/, long long /*value*/) override {}
+    void processColumn(const std::string& /*name*/, unsigned long long /*value*/) override {}
+    void processColumn(const std::string& /*name*/, const char* /*value*/) override {}
+
+    std::shared_ptr<core::FlowFile> getLastFlowFile() const {
+      if (!flow_files_.empty()) {
+        return flow_files_.back();
+      }
+      return {};
+    }
+
+    std::vector<std::shared_ptr<core::FlowFile>>& getFlowFiles() {
+      return flow_files_;
+    }
+
+   private:
+    core::ProcessSession& session_;
+    sql::JSONSQLWriter& json_writer_;
+    const utils::Identifier batch_id_{utils::IdGenerator::getIdGenerator()->generate()};
+    size_t current_batch_size_{0};

Review comment:
       `current_batch_size_` is not read anywhere, it can be removed

##########
File path: extensions/sql/processors/ExecuteSQL.cpp
##########
@@ -51,72 +41,87 @@ namespace processors {
 
 const std::string ExecuteSQL::ProcessorName("ExecuteSQL");
 
-const core::Property ExecuteSQL::s_sqlSelectQuery(
-  core::PropertyBuilder::createProperty("SQL select query")->isRequired(true)->withDescription(
+const core::Property ExecuteSQL::SQLSelectQuery(
+  core::PropertyBuilder::createProperty("SQL select query")
+  ->withDescription(
     "The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. "
     "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
     "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database. "
-    "Note that Expression Language is not evaluated for flow file contents.")->supportsExpressionLanguage(true)->build());
+    "Note that Expression Language is not evaluated for flow file contents.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property ExecuteSQL::s_maxRowsPerFlowFile(
-  core::PropertyBuilder::createProperty("Max Rows Per Flow File")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
-    "The maximum number of result rows that will be included intoi a flow file. If zero then all will be placed into the flow file")->supportsExpressionLanguage(true)->build());
+const core::Relationship ExecuteSQL::Success("success", "Successfully created FlowFile from SQL query result set.");
 
-const core::Relationship ExecuteSQL::s_success("success", "Successfully created FlowFile from SQL query result set.");
-
-static const std::string ResultRowCount = "executesql.row.count";
+const std::string ExecuteSQL::RESULT_ROW_COUNT = "executesql.row.count";
+const std::string ExecuteSQL::INPUT_FLOW_FILE_UUID = "input.flowfile.uuid";
 
 ExecuteSQL::ExecuteSQL(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid), max_rows_(0) {
+  : SQLProcessor(name, uuid, logging::LoggerFactory<ExecuteSQL>::getLogger()) {
 }
 
-ExecuteSQL::~ExecuteSQL() = default;
-
 void ExecuteSQL::initialize() {
   //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), outputFormat(), s_sqlSelectQuery, s_maxRowsPerFlowFile});
+  setSupportedProperties({ DBControllerService, OutputFormat, SQLSelectQuery, MaxRowsPerFlowFile});
 
   //! Set the supported relationships
-  setSupportedRelationships({ s_success });
+  setSupportedRelationships({ Success });
 }
 
-void ExecuteSQL::processOnSchedule(core::ProcessContext &context) {
-  initOutputFormat(context);
+void ExecuteSQL::processOnSchedule(core::ProcessContext& context) {
+  context.getProperty(OutputFormat.getName(), output_format_);
 
-  context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
-  context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
+  max_rows_ = [&] {
+    uint64_t max_rows;
+    context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
+    return gsl::narrow<size_t>(max_rows);
+  }();
 }
 
-void ExecuteSQL::processOnTrigger(core::ProcessSession& session) {
-  auto statement = connection_->prepareStatement(sqlSelectQuery_);
+void ExecuteSQL::processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) {
+  auto input_flow_file = session.get();
 
-  auto rowset = statement->execute();
-
-  int count = 0;
-  size_t rowCount = 0;
-  sql::JSONSQLWriter sqlWriter(isJSONPretty());
-  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, { &sqlWriter });
+  std::string query;
+  if (!context.getProperty(SQLSelectQuery, query, input_flow_file)) {
+    if (!input_flow_file) {
+      throw Exception(PROCESSOR_EXCEPTION,
+                      "No incoming FlowFile and the \"SQL select query\" processor property is not specified");
+    }
+    logger_->log_debug("Using the contents of the flow file as the SQL statement");
+    auto buffer = std::make_shared<io::BufferStream>();
+    InputStreamPipe content_reader{buffer};
+    session.read(input_flow_file, &content_reader);
+    query = std::string{reinterpret_cast<const char *>(buffer->getBuffer()), buffer->size()};
+  }
+  if (query.empty()) {
+    throw Exception(PROCESSOR_EXCEPTION, "Empty SQL statement");
+  }
+
+  auto row_set = connection_->prepareStatement(query)->execute(collectArguments(input_flow_file));
+
+  sql::JSONSQLWriter sqlWriter{output_format_ == OutputType::JSONPretty};
+  FlowFileGenerator flow_file_creator{session, sqlWriter};
+  sql::SQLRowsetProcessor sqlRowsetProcessor(row_set, {sqlWriter, flow_file_creator});
 
   // Process rowset.
-  do {
-    rowCount = sqlRowsetProcessor.process(max_rows_ == 0 ? std::numeric_limits<size_t>::max() : max_rows_);
-    count++;
-    if (rowCount == 0)
-      break;
-
-    const auto output = sqlWriter.toString();
-    if (!output.empty()) {
-      WriteCallback writer(output);
-      auto newflow = session.create();
-      newflow->addAttribute(ResultRowCount, std::to_string(rowCount));
-      session.write(newflow, &writer);
-      session.transfer(newflow, s_success);
+  while (size_t row_count = sqlRowsetProcessor.process(max_rows_)) {
+    auto new_file = flow_file_creator.getLastFlowFile();
+    new_file->addAttribute(RESULT_ROW_COUNT, std::to_string(row_count));

Review comment:
       it would be useful (at least for the peace of mind of someone reading this code) to check that `new_file` is not null




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603274520



##########
File path: libminifi/test/sql-tests/ExecuteSQLTests.cpp
##########
@@ -0,0 +1,195 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "SQLTestController.h"
+#include "processors/ExecuteSQL.h"
+#include "Utils.h"
+#include "FlowFileMatcher.h"
+
+TEST_CASE("ExecuteSQL works without incoming flow file", "[ExecuteSQL1]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty("SQL select query", "SELECT * FROM test_table ORDER BY int_col ASC");
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "2");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    },{
+      "int_col": 22,
+      "text_col": "two"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses statement in property", "[ExecuteSQL2]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty("SQL select query", "SELECT * FROM test_table WHERE int_col == ${int_col_value}");
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  auto input_file = plan->addInput({{"int_col_value", "11"}});
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "1");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses statement in content", "[ExecuteSQL3]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  auto input_file = plan->addInput({}, "SELECT * FROM test_table ORDER BY int_col ASC");
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "2");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    },{
+      "int_col": 22,
+      "text_col": "two"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses sql.args.N.value attributes", "[ExecuteSQL4]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+
+  controller.insertValues({{11, "apple"}, {11, "banana"}, {22, "banana"}});
+
+  auto input_file = plan->addInput({
+    {"sql.args.1.value", "11"},
+    {"sql.args.2.value", "banana"}
+  }, "SELECT * FROM test_table WHERE int_col == ? AND text_col == ?");
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "1");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "banana"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL honors Max Rows Per Flow File", "[ExecuteSQL5]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty(processors::ExecuteSQL::MaxRowsPerFlowFile.getName(), "2");
+  sql_proc->setProperty(processors::ExecuteSQL::SQLSelectQuery.getName(), "SELECT text_col FROM test_table ORDER BY int_col ASC");
+
+  controller.insertValues({
+    {101, "apple"},
+    {102, "banana"},
+    {103, "pear"},
+    {104, "strawberry"},
+    {105, "pineapple"}
+  });
+
+  auto input_file = plan->addInput();
+
+  plan->run();
+
+  auto content_verifier = [&] (const std::shared_ptr<core::FlowFile>& actual, const std::string& expected) {
+    verifyJSON(plan->getContent(actual), expected);
+  };
+
+  FlowFileMatcher matcher{content_verifier, {
+      processors::ExecuteSQL::RESULT_ROW_COUNT,
+      processors::ExecuteSQL::FRAGMENT_COUNT,
+      processors::ExecuteSQL::FRAGMENT_INDEX,
+      processors::ExecuteSQL::FRAGMENT_IDENTIFIER
+  }};
+
+  utils::optional<std::string> fragment_id;
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 3);
+  matcher.verify(flow_files[0],
+    {"2", "3", "0", var("frag_id")},
+    R"([{"text_col": "apple"}, {"text_col": "banana"}])");
+  matcher.verify(flow_files[1],
+    {"2", "3", "1", var("frag_id")},
+    R"([{"text_col": "pear"}, {"text_col": "strawberry"}])");
+  matcher.verify(flow_files[2],
+    {"1", "3", "2", var("frag_id")},

Review comment:
       Thanks!
   
   I still think a `REQUIRE(fragment_id)` would be useful (both here and in QueryDatabaseTableTests) to verify that the first flow file had the `FRAGMENT_IDENTIFIER` attribute set, since calling `operator*` on a `nullopt` is undefined behavior.
   
   Alternatively, we could use `fragment_id.value()`, which throws when called on a `nullopt`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603243373



##########
File path: libminifi/test/sql-tests/PutSQLTests.cpp
##########
@@ -0,0 +1,78 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "../TestBase.h"
+#include "SQLTestController.h"
+
+#include "processors/PutSQL.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/UpdateAttribute.h"
+#include "processors/LogAttribute.h"
+#include "processors/GetFile.h"
+
+TEST_CASE("Test Creation of PutSQL", "[PutSQLCreate]") {  // NOLINT

Review comment:
       nope, removed them




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r601685853



##########
File path: extensions/sql/processors/QueryDatabaseTable.cpp
##########
@@ -75,361 +72,233 @@ const core::Property QueryDatabaseTable::s_maxValueColumnNames(
     "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
     "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
     "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
-    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
-    supportsExpressionLanguage(true)->build());
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_whereClause(
-  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
-    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+const core::Property QueryDatabaseTable::WhereClause(
+  core::PropertyBuilder::createProperty("Where Clause")
+  ->isRequired(false)
+  ->withDescription("A custom clause to be added in the WHERE condition when building SQL queries.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_sqlQuery(
-  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
-    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
-    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+const std::string QueryDatabaseTable::InitialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
 
-const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
-  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
-    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
-    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+const core::Relationship QueryDatabaseTable::Success("success", "Successfully created FlowFile from SQL query result set.");
 
-const core::Property QueryDatabaseTable::s_stateDirectory(
-  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.")->build());
+const std::string QueryDatabaseTable::RESULT_TABLE_NAME = "tablename";
+const std::string QueryDatabaseTable::RESULT_ROW_COUNT = "querydbtable.row.count";
 
-const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
-
-const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
-
-static const std::string ResultTableName = "tablename";
-static const std::string ResultRowCount = "querydbtable.row.count";
-
-static const std::string TABLENAME_KEY = "tablename";
-static const std::string MAXVALUE_KEY_PREFIX = "maxvalue.";
-
-// State
-class LegacyState {
- public:
-  LegacyState(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
-    :tableName_(tableName), logger_(logger) {
-
-    filePath_ = utils::file::FileUtils::concat_path(
-            utils::file::FileUtils::concat_path(
-              utils::file::FileUtils::concat_path(stateDir, "uuid"), uuid), "State.txt");
-
-    if (!getStateFromFile())
-      return;
-
-    ok_ = true;
-  }
-
-  explicit operator bool() const {
-    return ok_;
-  }
-
-  const std::unordered_map<std::string, std::string>& getStateMap() const {
-    return mapState_;
-  }
-
-  bool moveStateFileToMigrated() {
-    if (!ok_) {
-      return false;
-    }
-    return rename(filePath_.c_str(), (filePath_ + "-migrated").c_str()) == 0;
-  }
-
- private:
-  static const std::string separator_;
-
-   bool getStateFromFile() {
-     std::string state;
-
-     std::ifstream file(filePath_);
-     if (!file) {
-       return false;
-     }
-
-     std::stringstream ss;
-     ss << file.rdbuf();
-
-     state = ss.str();
-
-     file.close();
-
-     std::vector<std::string> listColumnNameValue;
-
-     size_t pos = state.find(separator_, 0);
-     if (pos == std::string::npos) {
-       logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-       mapState_.clear();
-       return false;
-     }
-
-     auto tableName = state.substr(0, pos);
-     if (tableName != tableName_) {
-       logger_->log_warn("tableName is changed - now: '%s', in State.txt: '%s'.", tableName_.c_str(), tableName.c_str());
-       mapState_.clear();
-
-       return false;
-     }
-
-     pos += separator_.size();
-
-     while (true) {
-       auto newPos = state.find(separator_, pos);
-       if (newPos == std::string::npos)
-         break;
-
-       const std::string& columnNameValue = state.substr(pos, newPos - pos);
-       listColumnNameValue.emplace_back(columnNameValue);
-
-       pos = newPos + separator_.size();
-     }
-
-     for (const auto& columnNameValue : listColumnNameValue) {
-       const auto posEQ = columnNameValue.find('=');
-       if (posEQ == std::string::npos) {
-         logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-         mapState_.clear();
-         return false;
-       }
-
-       const auto& name = columnNameValue.substr(0, posEQ);
-       const auto& value = columnNameValue.substr(posEQ + 1);
-
-       mapState_.insert({ name, value });
-     }
-
-     return true;
-   }
-
- private:
-   std::unordered_map<std::string, std::string> mapState_;
-   std::string filePath_;
-   std::string tableName_;
-   std::shared_ptr<logging::Logger> logger_;
-   bool ok_{};
-};
-
-const std::string LegacyState::separator_ = "@!qdt!@";
+const std::string QueryDatabaseTable::TABLENAME_KEY = "tablename";
+const std::string QueryDatabaseTable::MAXVALUE_KEY_PREFIX = "maxvalue.";
 
 // QueryDatabaseTable
 QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid) {
+  : SQLProcessor(name, uuid, logging::LoggerFactory<QueryDatabaseTable>::getLogger()) {
 }
 
-QueryDatabaseTable::~QueryDatabaseTable() = default;
-
 void QueryDatabaseTable::initialize() {
   //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), outputFormat(), s_tableName, s_columnNames, s_maxValueColumnNames, s_whereClause, s_sqlQuery, s_maxRowsPerFlowFile, s_stateDirectory});
+  setSupportedProperties({
+    DBControllerService, OutputFormat, TableName, ColumnNames,
+    MaxValueColumnNames, WhereClause, MaxRowsPerFlowFile});
 
   //! Set the supported relationships
-  setSupportedRelationships({ s_success });
+  setSupportedRelationships({ Success });
 }
 
-void QueryDatabaseTable::processOnSchedule(core::ProcessContext &context) {
-  initOutputFormat(context);
+void QueryDatabaseTable::processOnSchedule(core::ProcessContext& context) {
+  context.getProperty(OutputFormat.getName(), output_format_);
+  max_rows_ = [&] {
+    uint64_t max_rows;
+    context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
+    return gsl::narrow<size_t>(max_rows);
+  }();
 
-  context.getProperty(s_tableName.getName(), tableName_);
-  context.getProperty(s_columnNames.getName(), columnNames_);
+  state_manager_ = context.getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+
+  context.getProperty(TableName.getName(), table_name_);
+  context.getProperty(WhereClause.getName(), extra_where_clause_);
+  max_value_columns_ = [&] {
+    std::string max_value_columns_str;
+    context.getProperty(MaxValueColumnNames.getName(), max_value_columns_str);
+    return utils::inputStringToList(max_value_columns_str);
+  }();
+  return_columns_ = [&] {
+    std::string return_columns_str;
+    context.getProperty(ColumnNames.getName(), return_columns_str);
+    return utils::inputStringToList(return_columns_str);
+  }();
+  queried_columns_ = utils::StringUtils::join(", ", return_columns_);
+  if (!queried_columns_.empty() && !max_value_columns_.empty()) {
+    // columns will be explicitly enumerated, we need to add the max value columns
+    queried_columns_ = queried_columns_ + ", " + utils::StringUtils::join(", ", max_value_columns_);
+  }
 
-  context.getProperty(s_maxValueColumnNames.getName(), maxValueColumnNames_);
-  listMaxValueColumnName_ = utils::inputStringToList(maxValueColumnNames_);
+  initializeMaxValues(context);
+}
 
-  context.getProperty(s_whereClause.getName(), whereClause_);
-  context.getProperty(s_sqlQuery.getName(), sqlQuery_);
-  context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_);
+void QueryDatabaseTable::processOnTrigger(core::ProcessContext& /*context*/, core::ProcessSession& session) {
+  const auto& selectQuery = buildSelectQuery();
 
-  mapState_.clear();
+  logger_->log_info("QueryDatabaseTable: selectQuery: '%s'", selectQuery.c_str());
 
-  state_manager_ = context.getStateManager();
-  if (state_manager_ == nullptr) {
-    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  auto statement = connection_->prepareStatement(selectQuery);
+
+  auto rowset = statement->execute();
+
+  std::unordered_map<std::string, std::string> new_max_values = max_values_;
+  sql::MaxCollector maxCollector{selectQuery, new_max_values};
+  auto column_filter = [&] (const std::string& column_name) {
+    return return_columns_.empty()
+      || std::find(return_columns_.begin(), return_columns_.end(), column_name) != return_columns_.end();
+  };
+  sql::JSONSQLWriter sqlWriter{output_format_ == OutputType::JSONPretty, column_filter};

Review comment:
       this could be `json_writer`, too

##########
File path: extensions/sql/processors/QueryDatabaseTable.cpp
##########
@@ -75,361 +72,233 @@ const core::Property QueryDatabaseTable::s_maxValueColumnNames(
     "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
     "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
     "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
-    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
-    supportsExpressionLanguage(true)->build());
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_whereClause(
-  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
-    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+const core::Property QueryDatabaseTable::WhereClause(
+  core::PropertyBuilder::createProperty("Where Clause")
+  ->isRequired(false)
+  ->withDescription("A custom clause to be added in the WHERE condition when building SQL queries.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_sqlQuery(
-  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
-    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
-    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+const std::string QueryDatabaseTable::InitialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
 
-const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
-  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
-    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
-    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+const core::Relationship QueryDatabaseTable::Success("success", "Successfully created FlowFile from SQL query result set.");
 
-const core::Property QueryDatabaseTable::s_stateDirectory(
-  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.")->build());
+const std::string QueryDatabaseTable::RESULT_TABLE_NAME = "tablename";
+const std::string QueryDatabaseTable::RESULT_ROW_COUNT = "querydbtable.row.count";
 
-const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
-
-const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
-
-static const std::string ResultTableName = "tablename";
-static const std::string ResultRowCount = "querydbtable.row.count";
-
-static const std::string TABLENAME_KEY = "tablename";
-static const std::string MAXVALUE_KEY_PREFIX = "maxvalue.";
-
-// State
-class LegacyState {
- public:
-  LegacyState(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
-    :tableName_(tableName), logger_(logger) {
-
-    filePath_ = utils::file::FileUtils::concat_path(
-            utils::file::FileUtils::concat_path(
-              utils::file::FileUtils::concat_path(stateDir, "uuid"), uuid), "State.txt");
-
-    if (!getStateFromFile())
-      return;
-
-    ok_ = true;
-  }
-
-  explicit operator bool() const {
-    return ok_;
-  }
-
-  const std::unordered_map<std::string, std::string>& getStateMap() const {
-    return mapState_;
-  }
-
-  bool moveStateFileToMigrated() {
-    if (!ok_) {
-      return false;
-    }
-    return rename(filePath_.c_str(), (filePath_ + "-migrated").c_str()) == 0;
-  }
-
- private:
-  static const std::string separator_;
-
-   bool getStateFromFile() {
-     std::string state;
-
-     std::ifstream file(filePath_);
-     if (!file) {
-       return false;
-     }
-
-     std::stringstream ss;
-     ss << file.rdbuf();
-
-     state = ss.str();
-
-     file.close();
-
-     std::vector<std::string> listColumnNameValue;
-
-     size_t pos = state.find(separator_, 0);
-     if (pos == std::string::npos) {
-       logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-       mapState_.clear();
-       return false;
-     }
-
-     auto tableName = state.substr(0, pos);
-     if (tableName != tableName_) {
-       logger_->log_warn("tableName is changed - now: '%s', in State.txt: '%s'.", tableName_.c_str(), tableName.c_str());
-       mapState_.clear();
-
-       return false;
-     }
-
-     pos += separator_.size();
-
-     while (true) {
-       auto newPos = state.find(separator_, pos);
-       if (newPos == std::string::npos)
-         break;
-
-       const std::string& columnNameValue = state.substr(pos, newPos - pos);
-       listColumnNameValue.emplace_back(columnNameValue);
-
-       pos = newPos + separator_.size();
-     }
-
-     for (const auto& columnNameValue : listColumnNameValue) {
-       const auto posEQ = columnNameValue.find('=');
-       if (posEQ == std::string::npos) {
-         logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-         mapState_.clear();
-         return false;
-       }
-
-       const auto& name = columnNameValue.substr(0, posEQ);
-       const auto& value = columnNameValue.substr(posEQ + 1);
-
-       mapState_.insert({ name, value });
-     }
-
-     return true;
-   }
-
- private:
-   std::unordered_map<std::string, std::string> mapState_;
-   std::string filePath_;
-   std::string tableName_;
-   std::shared_ptr<logging::Logger> logger_;
-   bool ok_{};
-};
-
-const std::string LegacyState::separator_ = "@!qdt!@";
+const std::string QueryDatabaseTable::TABLENAME_KEY = "tablename";
+const std::string QueryDatabaseTable::MAXVALUE_KEY_PREFIX = "maxvalue.";
 
 // QueryDatabaseTable
 QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid) {
+  : SQLProcessor(name, uuid, logging::LoggerFactory<QueryDatabaseTable>::getLogger()) {
 }
 
-QueryDatabaseTable::~QueryDatabaseTable() = default;
-
 void QueryDatabaseTable::initialize() {
   //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), outputFormat(), s_tableName, s_columnNames, s_maxValueColumnNames, s_whereClause, s_sqlQuery, s_maxRowsPerFlowFile, s_stateDirectory});
+  setSupportedProperties({
+    DBControllerService, OutputFormat, TableName, ColumnNames,
+    MaxValueColumnNames, WhereClause, MaxRowsPerFlowFile});
 
   //! Set the supported relationships
-  setSupportedRelationships({ s_success });
+  setSupportedRelationships({ Success });
 }
 
-void QueryDatabaseTable::processOnSchedule(core::ProcessContext &context) {
-  initOutputFormat(context);
+void QueryDatabaseTable::processOnSchedule(core::ProcessContext& context) {
+  context.getProperty(OutputFormat.getName(), output_format_);
+  max_rows_ = [&] {
+    uint64_t max_rows;
+    context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
+    return gsl::narrow<size_t>(max_rows);
+  }();
 
-  context.getProperty(s_tableName.getName(), tableName_);
-  context.getProperty(s_columnNames.getName(), columnNames_);
+  state_manager_ = context.getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+
+  context.getProperty(TableName.getName(), table_name_);
+  context.getProperty(WhereClause.getName(), extra_where_clause_);
+  max_value_columns_ = [&] {
+    std::string max_value_columns_str;
+    context.getProperty(MaxValueColumnNames.getName(), max_value_columns_str);
+    return utils::inputStringToList(max_value_columns_str);
+  }();
+  return_columns_ = [&] {
+    std::string return_columns_str;
+    context.getProperty(ColumnNames.getName(), return_columns_str);
+    return utils::inputStringToList(return_columns_str);
+  }();
+  queried_columns_ = utils::StringUtils::join(", ", return_columns_);
+  if (!queried_columns_.empty() && !max_value_columns_.empty()) {
+    // columns will be explicitly enumerated, we need to add the max value columns
+    queried_columns_ = queried_columns_ + ", " + utils::StringUtils::join(", ", max_value_columns_);
+  }
 
-  context.getProperty(s_maxValueColumnNames.getName(), maxValueColumnNames_);
-  listMaxValueColumnName_ = utils::inputStringToList(maxValueColumnNames_);
+  initializeMaxValues(context);
+}
 
-  context.getProperty(s_whereClause.getName(), whereClause_);
-  context.getProperty(s_sqlQuery.getName(), sqlQuery_);
-  context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_);
+void QueryDatabaseTable::processOnTrigger(core::ProcessContext& /*context*/, core::ProcessSession& session) {
+  const auto& selectQuery = buildSelectQuery();
 
-  mapState_.clear();
+  logger_->log_info("QueryDatabaseTable: selectQuery: '%s'", selectQuery.c_str());
 
-  state_manager_ = context.getStateManager();
-  if (state_manager_ == nullptr) {
-    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  auto statement = connection_->prepareStatement(selectQuery);
+
+  auto rowset = statement->execute();
+
+  std::unordered_map<std::string, std::string> new_max_values = max_values_;
+  sql::MaxCollector maxCollector{selectQuery, new_max_values};
+  auto column_filter = [&] (const std::string& column_name) {
+    return return_columns_.empty()
+      || std::find(return_columns_.begin(), return_columns_.end(), column_name) != return_columns_.end();
+  };
+  sql::JSONSQLWriter sqlWriter{output_format_ == OutputType::JSONPretty, column_filter};
+  FlowFileGenerator flow_file_creator{session, sqlWriter};
+  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, {sqlWriter, maxCollector, flow_file_creator});
+
+  while (size_t row_count = sqlRowsetProcessor.process(max_rows_)) {
+    auto new_file = flow_file_creator.getLastFlowFile();
+    new_file->addAttribute(RESULT_ROW_COUNT, std::to_string(row_count));

Review comment:
       a `gsl_Expects(new_file)` would be useful here, too

##########
File path: extensions/sql/processors/SQLProcessor.cpp
##########
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SQLProcessor.h"
+
+#include <vector>
+#include <memory>
+
+#include "core/FlowFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+
+#include <soci/error.h>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const core::Property SQLProcessor::DBControllerService(
+    core::PropertyBuilder::createProperty("DB Controller Service")
+    ->isRequired(true)
+    ->withDescription("Database Controller Service.")
+    ->supportsExpressionLanguage(true)->build());
+
+void SQLProcessor::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
+  std::string controllerService;
+  context->getProperty(DBControllerService.getName(), controllerService);
+
+  db_service_ = std::dynamic_pointer_cast<sql::controllers::DatabaseService>(context->getControllerService(controllerService));
+  if (!db_service_) {
+    throw minifi::Exception(PROCESSOR_EXCEPTION, "'DB Controller Service' must be defined");

Review comment:
       `DBControllerService.getName()` would be better




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603386255



##########
File path: .github/workflows/ci.yml
##########
@@ -132,12 +142,13 @@ jobs:
         run: |
           sudo apt-add-repository -y "ppa:ubuntu-toolchain-r/test"
           sudo apt update
-          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev
+          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so

Review comment:
       Even if the `libsqliteodbc` package is broken, we want `bootstrap.sh` followed by `make` to produce a working binary without any extra steps like symlinking.  So I would say we either need to figure out how to get `soci` to work without the symlink, or put the `ln -s` command in (a script called from) bootstrap.sh.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603929244



##########
File path: .github/workflows/ci.yml
##########
@@ -132,12 +142,13 @@ jobs:
         run: |
           sudo apt-add-repository -y "ppa:ubuntu-toolchain-r/test"
           sudo apt update
-          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev
+          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so

Review comment:
       created a ticket to track this issue: https://issues.apache.org/jira/browse/MINIFICPP-1534




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r599396392



##########
File path: extensions/sql/data/MaxCollector.h
##########
@@ -77,79 +74,66 @@ class MaxCollector: public SQLRowSubscriber {
   void processColumn(const std::string& name, const char* value) override {}
 
   template <typename T>
-  struct MaxValue {
-    void updateMaxValue(const std::string& name, const T& value) {
-      const auto it = mapColumnNameValue_.find(name);
-      if (it == mapColumnNameValue_.end()) {
-        mapColumnNameValue_.insert({ name, value });
+  class MaxValue {
+   public:
+    void updateMaxValue(const std::string& column, const T& value) {
+      const auto it = column_maxima.find(column);
+      if (it == column_maxima.end()) {
+        column_maxima.insert({ column, value });

Review comment:
       done

##########
File path: extensions/sql/processors/FlowFileSource.h
##########
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+#include <memory>
+
+#include "core/Property.h"
+#include "utils/Enum.h"
+#include "data/SQLRowsetProcessor.h"
+#include "ProcessSession.h"
+#include "data/JSONSQLWriter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class FlowFileSource {
+ public:
+  static const std::string FRAGMENT_IDENTIFIER;
+  static const std::string FRAGMENT_COUNT;
+  static const std::string FRAGMENT_INDEX;
+
+  static const core::Property OutputFormat;
+  static const core::Property MaxRowsPerFlowFile;
+
+  SMART_ENUM(OutputType,
+    (JSON, "JSON"),
+    (JSONPretty, "JSON-Pretty")
+  )
+
+ protected:
+  class FlowFileGenerator : public sql::SQLRowSubscriber {
+   public:
+    FlowFileGenerator(core::ProcessSession& session, sql::JSONSQLWriter& json_writer)
+      : session_(session),
+        json_writer_(json_writer) {}
+
+    void beginProcessBatch() override {
+      current_batch_size_ = 0;
+    }
+    void endProcessBatch(Progress progress) override;
+    void beginProcessRow() override {}
+    void endProcessRow() override {
+      ++current_batch_size_;
+    }
+    void processColumnNames(const std::vector<std::string>& names) override {}
+    void processColumn(const std::string& name, const std::string& value) override {}
+    void processColumn(const std::string& name, double value) override {}
+    void processColumn(const std::string& name, int value) override {}
+    void processColumn(const std::string& name, long long value) override {}
+    void processColumn(const std::string& name, unsigned long long value) override {}
+    void processColumn(const std::string& name, const char* value) override {}
+
+    std::shared_ptr<core::FlowFile> getLastFlowFile() {

Review comment:
       done

##########
File path: extensions/sql/processors/QueryDatabaseTable.cpp
##########
@@ -75,361 +72,232 @@ const core::Property QueryDatabaseTable::s_maxValueColumnNames(
     "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
     "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
     "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
-    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
-    supportsExpressionLanguage(true)->build());
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_whereClause(
-  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
-    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+const core::Property QueryDatabaseTable::WhereClause(
+  core::PropertyBuilder::createProperty("Where Clause")
+  ->isRequired(false)
+  ->withDescription("A custom clause to be added in the WHERE condition when building SQL queries.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_sqlQuery(
-  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
-    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
-    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+const std::string QueryDatabaseTable::InitialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
 
-const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
-  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
-    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
-    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+const core::Relationship QueryDatabaseTable::Success("success", "Successfully created FlowFile from SQL query result set.");
 
-const core::Property QueryDatabaseTable::s_stateDirectory(
-  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.")->build());
+const std::string QueryDatabaseTable::RESULT_TABLE_NAME = "tablename";
+const std::string QueryDatabaseTable::RESULT_ROW_COUNT = "querydbtable.row.count";
 
-const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+const std::string QueryDatabaseTable::TABLENAME_KEY = "tablename";
+const std::string QueryDatabaseTable::MAXVALUE_KEY_PREFIX = "maxvalue.";
 
-const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
-
-static const std::string ResultTableName = "tablename";
-static const std::string ResultRowCount = "querydbtable.row.count";
-
-static const std::string TABLENAME_KEY = "tablename";
-static const std::string MAXVALUE_KEY_PREFIX = "maxvalue.";
-
-// State
-class LegacyState {
- public:
-  LegacyState(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
-    :tableName_(tableName), logger_(logger) {
-
-    filePath_ = utils::file::FileUtils::concat_path(
-            utils::file::FileUtils::concat_path(
-              utils::file::FileUtils::concat_path(stateDir, "uuid"), uuid), "State.txt");
+// QueryDatabaseTable
+QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
+  : SQLProcessor(name, uuid, logging::LoggerFactory<QueryDatabaseTable>::getLogger()) {
+}
 
-    if (!getStateFromFile())
-      return;
+void QueryDatabaseTable::initialize() {
+  //! Set the supported properties
+  setSupportedProperties({
+    DBControllerService, OutputFormat, TableName, ColumnNames,
+    MaxValueColumnNames, WhereClause, MaxRowsPerFlowFile});
 
-    ok_ = true;
-  }
+  //! Set the supported relationships
+  setSupportedRelationships({ Success });
+}
 
-  explicit operator bool() const {
-    return ok_;
-  }
+void QueryDatabaseTable::processOnSchedule(core::ProcessContext& context) {
+  context.getProperty(OutputFormat.getName(), output_format_);
+  max_rows_ = [&] {
+    uint64_t max_rows;
+    context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
+    return gsl::narrow<size_t>(max_rows);
+  }();
 
-  const std::unordered_map<std::string, std::string>& getStateMap() const {
-    return mapState_;
+  state_manager_ = context.getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
   }
 
-  bool moveStateFileToMigrated() {
-    if (!ok_) {
-      return false;
-    }
-    return rename(filePath_.c_str(), (filePath_ + "-migrated").c_str()) == 0;
+  context.getProperty(TableName.getName(), table_name_);
+  context.getProperty(WhereClause.getName(), extra_where_clause_);
+  max_value_columns_ = [&] {
+    std::string max_value_columns_str;
+    context.getProperty(MaxValueColumnNames.getName(), max_value_columns_str);
+    return utils::inputStringToList(max_value_columns_str);
+  }();
+  return_columns_ = [&] {
+    std::string return_columns_str;
+    context.getProperty(ColumnNames.getName(), return_columns_str);
+    return utils::inputStringToList(return_columns_str);
+  }();
+  queried_columns_ = utils::StringUtils::join(", ", return_columns_);
+  if (!queried_columns_.empty() && !max_value_columns_.empty()) {
+    // columns will be explicitly enumerated, we need to add the max value columns
+    queried_columns_ = queried_columns_ + ", " + utils::StringUtils::join(", ", max_value_columns_);
   }
 
- private:
-  static const std::string separator_;
-
-   bool getStateFromFile() {
-     std::string state;
-
-     std::ifstream file(filePath_);
-     if (!file) {
-       return false;
-     }
-
-     std::stringstream ss;
-     ss << file.rdbuf();
-
-     state = ss.str();
-
-     file.close();
-
-     std::vector<std::string> listColumnNameValue;
-
-     size_t pos = state.find(separator_, 0);
-     if (pos == std::string::npos) {
-       logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-       mapState_.clear();
-       return false;
-     }
-
-     auto tableName = state.substr(0, pos);
-     if (tableName != tableName_) {
-       logger_->log_warn("tableName is changed - now: '%s', in State.txt: '%s'.", tableName_.c_str(), tableName.c_str());
-       mapState_.clear();
-
-       return false;
-     }
-
-     pos += separator_.size();
-
-     while (true) {
-       auto newPos = state.find(separator_, pos);
-       if (newPos == std::string::npos)
-         break;
-
-       const std::string& columnNameValue = state.substr(pos, newPos - pos);
-       listColumnNameValue.emplace_back(columnNameValue);
-
-       pos = newPos + separator_.size();
-     }
-
-     for (const auto& columnNameValue : listColumnNameValue) {
-       const auto posEQ = columnNameValue.find('=');
-       if (posEQ == std::string::npos) {
-         logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-         mapState_.clear();
-         return false;
-       }
-
-       const auto& name = columnNameValue.substr(0, posEQ);
-       const auto& value = columnNameValue.substr(posEQ + 1);
-
-       mapState_.insert({ name, value });
-     }
-
-     return true;
-   }
-
- private:
-   std::unordered_map<std::string, std::string> mapState_;
-   std::shared_ptr<logging::Logger> logger_;
-   std::string filePath_;
-   std::string tableName_;
-   bool ok_{};
-};
-
-const std::string LegacyState::separator_ = "@!qdt!@";
-
-// QueryDatabaseTable
-QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid) {
+  initializeMaxValues(context);
 }
 
-QueryDatabaseTable::~QueryDatabaseTable() = default;
-
-void QueryDatabaseTable::initialize() {
-  //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), outputFormat(), s_tableName, s_columnNames, s_maxValueColumnNames, s_whereClause, s_sqlQuery, s_maxRowsPerFlowFile, s_stateDirectory});
+void QueryDatabaseTable::processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) {
+  const auto& selectQuery = buildSelectQuery();
 
-  //! Set the supported relationships
-  setSupportedRelationships({ s_success });
-}
+  logger_->log_info("QueryDatabaseTable: selectQuery: '%s'", selectQuery.c_str());
 
-void QueryDatabaseTable::processOnSchedule(core::ProcessContext &context) {
-  initOutputFormat(context);
+  auto statement = connection_->prepareStatement(selectQuery);
 
-  context.getProperty(s_tableName.getName(), tableName_);
-  context.getProperty(s_columnNames.getName(), columnNames_);
+  auto rowset = statement->execute();
 
-  context.getProperty(s_maxValueColumnNames.getName(), maxValueColumnNames_);
-  listMaxValueColumnName_ = utils::inputStringToList(maxValueColumnNames_);
+  std::unordered_map<std::string, std::string> new_max_values = max_values_;
+  sql::MaxCollector maxCollector{selectQuery, new_max_values};
+  auto column_filter = [&] (const std::string& column_name) {
+    return return_columns_.empty()
+      || std::find(return_columns_.begin(), return_columns_.end(), column_name) != return_columns_.end();
+  };
+  sql::JSONSQLWriter sqlWriter{output_format_ == OutputType::JSONPretty, column_filter};
+  FlowFileGenerator flow_file_creator{session, sqlWriter};
+  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, {sqlWriter, maxCollector, flow_file_creator});
+
+  while (size_t row_count = sqlRowsetProcessor.process(max_rows_)) {
+    auto new_file = flow_file_creator.getLastFlowFile();
+    new_file->addAttribute(RESULT_ROW_COUNT, std::to_string(row_count));
+    new_file->addAttribute(RESULT_TABLE_NAME, table_name_);
+  }
 
-  context.getProperty(s_whereClause.getName(), whereClause_);
-  context.getProperty(s_sqlQuery.getName(), sqlQuery_);
-  context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_);
+  // the updated max_values and the total number of flow_files is available from here
+  for (auto& new_file : flow_file_creator.getFlowFiles()) {
+    session.transfer(new_file, Success);
+    for (const auto& max_column : max_value_columns_) {
+      new_file->addAttribute("maxvalue." + max_column, new_max_values[max_column]);
+    }
+  }
 
-  mapState_.clear();
+  if (new_max_values != max_values_) {
+    try {
+      session.commit();
+    } catch (std::exception& e) {
+      throw;
+    }

Review comment:
       done

##########
File path: extensions/sql/processors/QueryDatabaseTable.cpp
##########
@@ -75,361 +72,232 @@ const core::Property QueryDatabaseTable::s_maxValueColumnNames(
     "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
     "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
     "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
-    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
-    supportsExpressionLanguage(true)->build());
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_whereClause(
-  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
-    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+const core::Property QueryDatabaseTable::WhereClause(
+  core::PropertyBuilder::createProperty("Where Clause")
+  ->isRequired(false)
+  ->withDescription("A custom clause to be added in the WHERE condition when building SQL queries.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_sqlQuery(
-  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
-    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
-    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+const std::string QueryDatabaseTable::InitialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
 
-const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
-  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
-    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
-    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+const core::Relationship QueryDatabaseTable::Success("success", "Successfully created FlowFile from SQL query result set.");
 
-const core::Property QueryDatabaseTable::s_stateDirectory(
-  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.")->build());
+const std::string QueryDatabaseTable::RESULT_TABLE_NAME = "tablename";
+const std::string QueryDatabaseTable::RESULT_ROW_COUNT = "querydbtable.row.count";
 
-const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+const std::string QueryDatabaseTable::TABLENAME_KEY = "tablename";
+const std::string QueryDatabaseTable::MAXVALUE_KEY_PREFIX = "maxvalue.";
 
-const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
-
-static const std::string ResultTableName = "tablename";
-static const std::string ResultRowCount = "querydbtable.row.count";
-
-static const std::string TABLENAME_KEY = "tablename";
-static const std::string MAXVALUE_KEY_PREFIX = "maxvalue.";
-
-// State
-class LegacyState {
- public:
-  LegacyState(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
-    :tableName_(tableName), logger_(logger) {
-
-    filePath_ = utils::file::FileUtils::concat_path(
-            utils::file::FileUtils::concat_path(
-              utils::file::FileUtils::concat_path(stateDir, "uuid"), uuid), "State.txt");
+// QueryDatabaseTable
+QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
+  : SQLProcessor(name, uuid, logging::LoggerFactory<QueryDatabaseTable>::getLogger()) {
+}
 
-    if (!getStateFromFile())
-      return;
+void QueryDatabaseTable::initialize() {
+  //! Set the supported properties
+  setSupportedProperties({
+    DBControllerService, OutputFormat, TableName, ColumnNames,
+    MaxValueColumnNames, WhereClause, MaxRowsPerFlowFile});
 
-    ok_ = true;
-  }
+  //! Set the supported relationships
+  setSupportedRelationships({ Success });
+}
 
-  explicit operator bool() const {
-    return ok_;
-  }
+void QueryDatabaseTable::processOnSchedule(core::ProcessContext& context) {
+  context.getProperty(OutputFormat.getName(), output_format_);
+  max_rows_ = [&] {
+    uint64_t max_rows;
+    context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
+    return gsl::narrow<size_t>(max_rows);
+  }();
 
-  const std::unordered_map<std::string, std::string>& getStateMap() const {
-    return mapState_;
+  state_manager_ = context.getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
   }
 
-  bool moveStateFileToMigrated() {
-    if (!ok_) {
-      return false;
-    }
-    return rename(filePath_.c_str(), (filePath_ + "-migrated").c_str()) == 0;
+  context.getProperty(TableName.getName(), table_name_);
+  context.getProperty(WhereClause.getName(), extra_where_clause_);
+  max_value_columns_ = [&] {
+    std::string max_value_columns_str;
+    context.getProperty(MaxValueColumnNames.getName(), max_value_columns_str);
+    return utils::inputStringToList(max_value_columns_str);
+  }();
+  return_columns_ = [&] {
+    std::string return_columns_str;
+    context.getProperty(ColumnNames.getName(), return_columns_str);
+    return utils::inputStringToList(return_columns_str);
+  }();
+  queried_columns_ = utils::StringUtils::join(", ", return_columns_);
+  if (!queried_columns_.empty() && !max_value_columns_.empty()) {
+    // columns will be explicitly enumerated, we need to add the max value columns
+    queried_columns_ = queried_columns_ + ", " + utils::StringUtils::join(", ", max_value_columns_);
   }
 
- private:
-  static const std::string separator_;
-
-   bool getStateFromFile() {
-     std::string state;
-
-     std::ifstream file(filePath_);
-     if (!file) {
-       return false;
-     }
-
-     std::stringstream ss;
-     ss << file.rdbuf();
-
-     state = ss.str();
-
-     file.close();
-
-     std::vector<std::string> listColumnNameValue;
-
-     size_t pos = state.find(separator_, 0);
-     if (pos == std::string::npos) {
-       logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-       mapState_.clear();
-       return false;
-     }
-
-     auto tableName = state.substr(0, pos);
-     if (tableName != tableName_) {
-       logger_->log_warn("tableName is changed - now: '%s', in State.txt: '%s'.", tableName_.c_str(), tableName.c_str());
-       mapState_.clear();
-
-       return false;
-     }
-
-     pos += separator_.size();
-
-     while (true) {
-       auto newPos = state.find(separator_, pos);
-       if (newPos == std::string::npos)
-         break;
-
-       const std::string& columnNameValue = state.substr(pos, newPos - pos);
-       listColumnNameValue.emplace_back(columnNameValue);
-
-       pos = newPos + separator_.size();
-     }
-
-     for (const auto& columnNameValue : listColumnNameValue) {
-       const auto posEQ = columnNameValue.find('=');
-       if (posEQ == std::string::npos) {
-         logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-         mapState_.clear();
-         return false;
-       }
-
-       const auto& name = columnNameValue.substr(0, posEQ);
-       const auto& value = columnNameValue.substr(posEQ + 1);
-
-       mapState_.insert({ name, value });
-     }
-
-     return true;
-   }
-
- private:
-   std::unordered_map<std::string, std::string> mapState_;
-   std::shared_ptr<logging::Logger> logger_;
-   std::string filePath_;
-   std::string tableName_;
-   bool ok_{};
-};
-
-const std::string LegacyState::separator_ = "@!qdt!@";
-
-// QueryDatabaseTable
-QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid) {
+  initializeMaxValues(context);
 }
 
-QueryDatabaseTable::~QueryDatabaseTable() = default;
-
-void QueryDatabaseTable::initialize() {
-  //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), outputFormat(), s_tableName, s_columnNames, s_maxValueColumnNames, s_whereClause, s_sqlQuery, s_maxRowsPerFlowFile, s_stateDirectory});
+void QueryDatabaseTable::processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) {
+  const auto& selectQuery = buildSelectQuery();
 
-  //! Set the supported relationships
-  setSupportedRelationships({ s_success });
-}
+  logger_->log_info("QueryDatabaseTable: selectQuery: '%s'", selectQuery.c_str());
 
-void QueryDatabaseTable::processOnSchedule(core::ProcessContext &context) {
-  initOutputFormat(context);
+  auto statement = connection_->prepareStatement(selectQuery);
 
-  context.getProperty(s_tableName.getName(), tableName_);
-  context.getProperty(s_columnNames.getName(), columnNames_);
+  auto rowset = statement->execute();
 
-  context.getProperty(s_maxValueColumnNames.getName(), maxValueColumnNames_);
-  listMaxValueColumnName_ = utils::inputStringToList(maxValueColumnNames_);
+  std::unordered_map<std::string, std::string> new_max_values = max_values_;
+  sql::MaxCollector maxCollector{selectQuery, new_max_values};
+  auto column_filter = [&] (const std::string& column_name) {
+    return return_columns_.empty()
+      || std::find(return_columns_.begin(), return_columns_.end(), column_name) != return_columns_.end();
+  };
+  sql::JSONSQLWriter sqlWriter{output_format_ == OutputType::JSONPretty, column_filter};
+  FlowFileGenerator flow_file_creator{session, sqlWriter};
+  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, {sqlWriter, maxCollector, flow_file_creator});
+
+  while (size_t row_count = sqlRowsetProcessor.process(max_rows_)) {
+    auto new_file = flow_file_creator.getLastFlowFile();
+    new_file->addAttribute(RESULT_ROW_COUNT, std::to_string(row_count));
+    new_file->addAttribute(RESULT_TABLE_NAME, table_name_);
+  }
 
-  context.getProperty(s_whereClause.getName(), whereClause_);
-  context.getProperty(s_sqlQuery.getName(), sqlQuery_);
-  context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_);
+  // the updated max_values and the total number of flow_files is available from here
+  for (auto& new_file : flow_file_creator.getFlowFiles()) {
+    session.transfer(new_file, Success);
+    for (const auto& max_column : max_value_columns_) {
+      new_file->addAttribute("maxvalue." + max_column, new_max_values[max_column]);
+    }
+  }
 
-  mapState_.clear();
+  if (new_max_values != max_values_) {
+    try {
+      session.commit();
+    } catch (std::exception& e) {
+      throw;
+    }
 
-  state_manager_ = context.getStateManager();
-  if (state_manager_ == nullptr) {
-    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+    max_values_ = new_max_values;
+    saveState();
   }
+}
 
-  std::unordered_map<std::string, std::string> state_map;
-  if (state_manager_->get(state_map)) {
-    if (state_map[TABLENAME_KEY] != tableName_) {
-      state_manager_->clear();
-    } else {
-      for (auto&& elem : state_map) {
-        if (elem.first.find(MAXVALUE_KEY_PREFIX) == 0) {
-          mapState_.emplace(elem.first.substr(MAXVALUE_KEY_PREFIX.length()), std::move(elem.second));
+void QueryDatabaseTable::initializeMaxValues(core::ProcessContext &context) {
+  max_values_.clear();
+  std::unordered_map<std::string, std::string> new_state;
+  if (!state_manager_->get(new_state)) {
+    logger_->log_info("Found no stored state");
+  } else {
+    const bool should_reset_state = [&] {

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603243515



##########
File path: libminifi/test/sql-tests/ExecuteSQLTests.cpp
##########
@@ -0,0 +1,195 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "SQLTestController.h"
+#include "processors/ExecuteSQL.h"
+#include "Utils.h"
+#include "FlowFileMatcher.h"
+
+TEST_CASE("ExecuteSQL works without incoming flow file", "[ExecuteSQL1]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty("SQL select query", "SELECT * FROM test_table ORDER BY int_col ASC");
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "2");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    },{
+      "int_col": 22,
+      "text_col": "two"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses statement in property", "[ExecuteSQL2]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty("SQL select query", "SELECT * FROM test_table WHERE int_col == ${int_col_value}");
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  auto input_file = plan->addInput({{"int_col_value", "11"}});
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "1");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses statement in content", "[ExecuteSQL3]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  auto input_file = plan->addInput({}, "SELECT * FROM test_table ORDER BY int_col ASC");
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "2");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    },{
+      "int_col": 22,
+      "text_col": "two"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses sql.args.N.value attributes", "[ExecuteSQL4]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+
+  controller.insertValues({{11, "apple"}, {11, "banana"}, {22, "banana"}});
+
+  auto input_file = plan->addInput({
+    {"sql.args.1.value", "11"},
+    {"sql.args.2.value", "banana"}
+  }, "SELECT * FROM test_table WHERE int_col == ? AND text_col == ?");
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "1");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "banana"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL honors Max Rows Per Flow File", "[ExecuteSQL5]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty(processors::ExecuteSQL::MaxRowsPerFlowFile.getName(), "2");
+  sql_proc->setProperty(processors::ExecuteSQL::SQLSelectQuery.getName(), "SELECT text_col FROM test_table ORDER BY int_col ASC");
+
+  controller.insertValues({
+    {101, "apple"},
+    {102, "banana"},
+    {103, "pear"},
+    {104, "strawberry"},
+    {105, "pineapple"}
+  });
+
+  auto input_file = plan->addInput();
+
+  plan->run();
+
+  auto content_verifier = [&] (const std::shared_ptr<core::FlowFile>& actual, const std::string& expected) {
+    verifyJSON(plan->getContent(actual), expected);
+  };
+
+  FlowFileMatcher matcher{content_verifier, {
+      processors::ExecuteSQL::RESULT_ROW_COUNT,
+      processors::ExecuteSQL::FRAGMENT_COUNT,
+      processors::ExecuteSQL::FRAGMENT_INDEX,
+      processors::ExecuteSQL::FRAGMENT_IDENTIFIER
+  }};
+
+  utils::optional<std::string> fragment_id;
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 3);
+  matcher.verify(flow_files[0],
+    {"2", "3", "0", var("frag_id")},
+    R"([{"text_col": "apple"}, {"text_col": "banana"}])");
+  matcher.verify(flow_files[1],
+    {"2", "3", "1", var("frag_id")},
+    R"([{"text_col": "pear"}, {"text_col": "strawberry"}])");
+  matcher.verify(flow_files[2],
+    {"1", "3", "2", var("frag_id")},

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#issuecomment-780378765


   investigating increased times on mac in the CI


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603909790



##########
File path: .github/workflows/ci.yml
##########
@@ -132,12 +142,13 @@ jobs:
         run: |
           sudo apt-add-repository -y "ppa:ubuntu-toolchain-r/test"
           sudo apt update
-          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev
+          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so

Review comment:
       Without the symlink, my agent starts up, but I get this error in PutSQL::onTrigger:
   ```
   [org::apache::nifi::minifi::processors::PutSQL] [error] SQLProcessor: 'Error connecting to database: [iODBC][Driver Manager]libsqlite3odbc.so: cannot open shared object file: No such file or directory (SQL state 00000)'
   ```
   (or the `onTrigger` of one of the other SQL processors)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603242138



##########
File path: libminifi/test/sql-tests/SQLTestPlan.h
##########
@@ -0,0 +1,98 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "../TestBase.h"
+
+class SQLTestPlan {
+ public:
+  SQLTestPlan(TestController& controller, const std::string& connection_str, const std::string& sql_processor, std::initializer_list<core::Relationship> output_rels) {
+    plan_ = controller.createPlan();
+    processor_ = plan_->addProcessor(sql_processor, sql_processor);
+    plan_->setProperty(processor_, "DB Controller Service", "ODBCService");
+    input_ = plan_->addConnection({}, {"success", "d"}, processor_);
+    for (const auto& output_rel : output_rels) {
+      outputs_[output_rel] = plan_->addConnection(processor_, output_rel, {});
+    }
+
+    // initialize database service
+    auto service = plan_->addController("ODBCService", "ODBCService");
+    plan_->setProperty(service, minifi::sql::controllers::DatabaseService::ConnectionString.getName(), connection_str);
+  }
+
+  std::string getContent(const std::shared_ptr<core::FlowFile>& flow_file) {
+    return plan_->getContent(flow_file);
+  }
+
+  std::shared_ptr<core::FlowFile> addInput(std::initializer_list<std::pair<std::string, std::string>> attributes = {}, const utils::optional<std::string>& content = {}) {
+    auto flow_file = std::make_shared<minifi::FlowFileRecord>();
+    for (const auto& attr : attributes) {
+      flow_file->setAttribute(attr.first, attr.second);
+    }
+    if (content) {
+      auto claim = std::make_shared<minifi::ResourceClaim>(plan_->getContentRepo());
+      auto content_stream = plan_->getContentRepo()->write(*claim);
+      int ret = content_stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(content->c_str())), content->length());
+      REQUIRE(ret == content->length());
+      flow_file->setOffset(0);
+      flow_file->setSize(content->length());
+      flow_file->setResourceClaim(claim);
+    }
+    input_->put(flow_file);
+    return flow_file;
+  }
+
+  std::shared_ptr<core::Processor> getSQLProcessor() {
+    return processor_;
+  }
+
+  void run(bool reschedule = false) {
+    if (reschedule) {
+      plan_->reset(reschedule);
+    }
+    plan_->runProcessor(0);  // run the one and only sql processor
+  }
+
+  std::map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> getAllOutputs() {

Review comment:
       removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r599662804



##########
File path: extensions/sql/data/DatabaseConnectors.h
##########
@@ -45,8 +45,12 @@ class Statement {
 
   virtual ~Statement() = default;
 
-  soci::rowset<soci::row> execute() {
-    return session_.prepare << query_;
+  soci::rowset<soci::row> execute(const std::vector<std::string>& args = {}) {
+    auto stmt = session_.prepare << query_;
+    for (auto& arg : args) {
+      stmt, soci::use(arg);

Review comment:
       Is this comma operator necessary here?

##########
File path: extensions/sql/data/MaxCollector.h
##########
@@ -31,26 +33,21 @@ namespace minifi {
 namespace sql {
 
 class MaxCollector: public SQLRowSubscriber {
-  void beginProcessRow() override {}
-
-  void endProcessRow() override {
-    if (columnsVerified_) {
-      return;
+  void beginProcessBatch() override {}
+  void endProcessBatch(Progress progress) override {
+    if (progress == Progress::DONE) {
+      updateMapState();
     }
-
-    if (countColumns_ != mapState_.size())
-      throw minifi::Exception(PROCESSOR_EXCEPTION, "MaxCollector: Column(s) '" + maxValueColumnNames_ + "' are not found in the columns of '" + selectQuery_ + "' result.");
-
-    columnsVerified_ = true;
   }
+  void beginProcessRow() override {}
+  void endProcessRow() override {}
 
-  void processColumnName(const std::string& name) override {
-    if (columnsVerified_) {
-      return;
-    }
-
-    if (mapState_.count(name)) {
-      countColumns_++;
+  void processColumnNames(const std::vector<std::string>& names) override {
+    for (auto& expected : state_) {

Review comment:
       I suggest const auto &

##########
File path: extensions/sql/data/SQLRowsetProcessor.cpp
##########
@@ -20,51 +20,61 @@
 
 #include "Exception.h"
 #include "Utils.h"
+#include "utils/StringUtils.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace sql {
 
-SQLRowsetProcessor::SQLRowsetProcessor(const soci::rowset<soci::row>& rowset, const std::vector<SQLRowSubscriber*>& rowSubscribers)
-  : rowset_(rowset), rowSubscribers_(rowSubscribers) {
+SQLRowsetProcessor::SQLRowsetProcessor(const soci::rowset<soci::row>& rowset, std::vector<std::reference_wrapper<SQLRowSubscriber>> row_subscribers)
+  : rowset_(rowset), row_subscribers_(std::move(row_subscribers)) {
   iter_ = rowset_.begin();
 }
 
 size_t SQLRowsetProcessor::process(size_t max) {
   size_t count = 0;
 
+  for (const auto& subscriber : row_subscribers_) {
+    subscriber.get().beginProcessBatch();
+  }
+
   for (; iter_ != rowset_.end(); ) {
     addRow(*iter_, count);
     iter_++;
     count++;
-    totalCount_++;
     if (max > 0 && count >= max) {
       break;
     }
   }
 
+  for (const auto& subscriber : row_subscribers_) {
+    subscriber.get().endProcessBatch(count == 0 ? SQLRowSubscriber::Progress::DONE : SQLRowSubscriber::Progress::CONTINUE);
+  }
+
   return count;
 }
 
 void SQLRowsetProcessor::addRow(const soci::row& row, size_t rowCount) {
-  for (const auto& pRowSubscriber : rowSubscribers_) {
-    pRowSubscriber->beginProcessRow();
+  for (const auto& subscriber : row_subscribers_) {
+    subscriber.get().beginProcessRow();
   }
 
   if (rowCount == 0) {
+    std::vector<std::string> column_names;

Review comment:
       You can add a reserve() here since you know the number of elements to be pushed. But it's not necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603909790



##########
File path: .github/workflows/ci.yml
##########
@@ -132,12 +142,13 @@ jobs:
         run: |
           sudo apt-add-repository -y "ppa:ubuntu-toolchain-r/test"
           sudo apt update
-          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev
+          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so

Review comment:
       Without the symlink, my agent starts up, but I get this error in PutSQL::onTrigger:
   ```
   [org::apache::nifi::minifi::processors::PutSQL] [error] SQLProcessor: 'Error connecting to database: [iODBC][Driver Manager]libsqlite3odbc.so: cannot open shared object file: No such file or directory (SQL state 00000)'
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603881995



##########
File path: .github/workflows/ci.yml
##########
@@ -132,12 +142,13 @@ jobs:
         run: |
           sudo apt-add-repository -y "ppa:ubuntu-toolchain-r/test"
           sudo apt update
-          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev
+          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so

Review comment:
       The binary does not work on my Ubuntu 18.04 without the symlink.  With the symlink, it works fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603896154



##########
File path: .github/workflows/ci.yml
##########
@@ -132,12 +142,13 @@ jobs:
         run: |
           sudo apt-add-repository -y "ppa:ubuntu-toolchain-r/test"
           sudo apt update
-          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev
+          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so

Review comment:
       if I uninstall sqliteodbc on mac, the agent starts up just fine, but of course the sql processors won't work
   ```
   [org::apache::nifi::minifi::processors::ExecuteSQL] [error] SQLProcessor: 'Error connecting to database: [iODBC][Driver Manager]dlopen(/usr/local/lib/libsqlite3odbc.dylib, 6): image not found (SQL state 00000)'
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603243747



##########
File path: libminifi/test/sql-tests/ExecuteSQLTests.cpp
##########
@@ -0,0 +1,195 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "SQLTestController.h"
+#include "processors/ExecuteSQL.h"
+#include "Utils.h"
+#include "FlowFileMatcher.h"
+
+TEST_CASE("ExecuteSQL works without incoming flow file", "[ExecuteSQL1]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty("SQL select query", "SELECT * FROM test_table ORDER BY int_col ASC");
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "2");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    },{
+      "int_col": 22,
+      "text_col": "two"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses statement in property", "[ExecuteSQL2]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty("SQL select query", "SELECT * FROM test_table WHERE int_col == ${int_col_value}");

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni removed a comment on pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni removed a comment on pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#issuecomment-780378765


   investigating increased runtimes on mac in the CI


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni edited a comment on pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni edited a comment on pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#issuecomment-780378765


   investigating increased runtimes on mac in the CI


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603242347



##########
File path: extensions/sql/processors/QueryDatabaseTable.cpp
##########
@@ -75,361 +72,234 @@ const core::Property QueryDatabaseTable::s_maxValueColumnNames(
     "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
     "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
     "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
-    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
-    supportsExpressionLanguage(true)->build());
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_whereClause(
-  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
-    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+const core::Property QueryDatabaseTable::WhereClause(
+  core::PropertyBuilder::createProperty("Where Clause")
+  ->isRequired(false)
+  ->withDescription("A custom clause to be added in the WHERE condition when building SQL queries.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_sqlQuery(
-  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
-    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
-    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+const std::string QueryDatabaseTable::InitialMaxValueDynamicPropertyPrefix("initial.maxvalue.");

Review comment:
       added

##########
File path: libminifi/test/sql-tests/QueryDatabaseTableTests.cpp
##########
@@ -0,0 +1,248 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "../TestBase.h"
+#include "SQLTestController.h"
+#include "Utils.h"
+#include "FlowFileMatcher.h"
+
+TEST_CASE("QueryDatabaseTable queries the table and returns specified columns", "[QueryDatabaseTable1]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty(processors::QueryDatabaseTable::TableName.getName(), "test_table");
+  sql_proc->setProperty(processors::QueryDatabaseTable::MaxValueColumnNames.getName(), "int_col");
+  sql_proc->setProperty(processors::QueryDatabaseTable::ColumnNames.getName(), "text_col");
+
+  controller.insertValues({
+    {101, "one"},
+    {102, "two"},
+    {103, "three"}
+  });
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "3");
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{"text_col": "one"}, {"text_col": "two"}, {"text_col": "three"}]
+  )", true);
+}
+
+TEST_CASE("QueryDatabaseTable requerying the table returns only new rows", "[QueryDatabaseTable2]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty(processors::QueryDatabaseTable::TableName.getName(), "test_table");
+  sql_proc->setProperty(processors::QueryDatabaseTable::MaxValueColumnNames.getName(), "int_col");
+  sql_proc->setProperty(processors::QueryDatabaseTable::ColumnNames.getName(), "text_col");
+
+  controller.insertValues({
+    {101, "one"},
+    {102, "two"},
+    {103, "three"}
+  });
+
+  plan->run();
+
+  auto first_flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(first_flow_files.size() == 1);
+
+  controller.insertValues({
+    {104, "four"},
+    {105, "five"}
+  });
+
+  SECTION("Without schedule") {plan->run();}
+  SECTION("With schedule") {plan->run(true);}

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603243747



##########
File path: libminifi/test/sql-tests/ExecuteSQLTests.cpp
##########
@@ -0,0 +1,195 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "SQLTestController.h"
+#include "processors/ExecuteSQL.h"
+#include "Utils.h"
+#include "FlowFileMatcher.h"
+
+TEST_CASE("ExecuteSQL works without incoming flow file", "[ExecuteSQL1]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty("SQL select query", "SELECT * FROM test_table ORDER BY int_col ASC");
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "2");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    },{
+      "int_col": 22,
+      "text_col": "two"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses statement in property", "[ExecuteSQL2]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty("SQL select query", "SELECT * FROM test_table WHERE int_col == ${int_col_value}");

Review comment:
       done, changed it to `=`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1004: MINIFICPP-1450 - Revive SQL processors

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603920488



##########
File path: .github/workflows/ci.yml
##########
@@ -132,12 +142,13 @@ jobs:
         run: |
           sudo apt-add-repository -y "ppa:ubuntu-toolchain-r/test"
           sudo apt update
-          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev
+          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so

Review comment:
       But it does work with the connection string `Driver=/usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so;Database=/tmp/my_database.db`, so I guess that's OK.
   
   It would be nice if the SQL processors could use the driver and database definitions in `odbcinst.ini` and `odbc.ini`, respectively, but that can be an improvement we'll add later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org