You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/02/11 08:17:26 UTC

[GitHub] [nifi-minifi-cpp] am-c-p-p opened a new pull request #732: MINIFICPP-1013

am-c-p-p opened a new pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732
 
 
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on issue #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on issue #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#issuecomment-584527004
 
 
   This PR is instead of https://github.com/apache/nifi-minifi-cpp/pull/656

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on issue #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
bakaid commented on issue #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#issuecomment-584589501
 
 
   @am-c-p-p This branch is missing my changes in https://github.com/bakaid/nifi-minifi-cpp/commit/15ea8e068e05975ca438eea5c3d28b775dd8e392 that makes sure that the SQL and the SQLite extensions can't be enabled at the same time.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377584537
 
 

 ##########
 File path: extensions/sql/data/MaxCollector.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <tuple>
+
+#include "SQLRowSubscriber.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+class MaxCollector: public SQLRowSubscriber {
+  void beginProcessRow() override {}
+
+  void endProcessRow() override {
+    if (columnsVerified_) {
+      return;
+    }
+
+    if (countColumns_ != mapState_.size())
+      throw minifi::Exception(PROCESSOR_EXCEPTION, "MaxCollector: Column(s) '" + maxValueColumnNames_ + "' are not found in the columns of '" + selectQuery_ + "' result.");
+
+    columnsVerified_ = true;
+  }
+
+  void processColumnName(const std::string& name) override {
+    if (columnsVerified_) {
+      return;
+    }
+
+    if (mapState_.count(name)) {
+      countColumns_++;
+    }
+  }
+
+  void processColumn(const std::string& name, const std::string& value)  override {
+    updateMaxValue(name, '\'' + value + '\'');
+  }
+
+  void processColumn(const std::string& name, double value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, int value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, long long value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, unsigned long long value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, const char* value) override {}
+
+  template <typename T>
+  struct MaxValue {
+    void updateMaxValue(const std::string& name, const T& value) {
+      const auto it = mapColumnNameValue_.find(name);
+      if (it == mapColumnNameValue_.end()) {
+        mapColumnNameValue_.insert({ name, value });
+      } else {
+        if (value > it->second) {
+          it->second = value;
+        }
+      }
+    }
+
+    std::unordered_map<std::string, T> mapColumnNameValue_;
+  };
+
+  template <typename T, typename Tuple, int Index, bool>
+  struct TupleIndexByType {
+    constexpr static int index() {
+      using tupleElType = typename std::decay<decltype(std::get<Index + 1>(Tuple()))>::type;
+
+      return TupleIndexByType<T, Tuple, Index + 1, std::is_same<tupleElType, MaxValue<T>>::value>::index();
+    }
+  };
+
+  template <typename T, typename Tuple, int Index>
+  struct TupleIndexByType<T, Tuple, Index, true> {
+    constexpr static int index() {
+      return Index;
+    }
+  };
+
+  template <typename Tuple, int Index>
+  struct UpdateMapState {
+    UpdateMapState(const Tuple& tpl, std::unordered_map<std::string, std::string>& mapState) {
+      for (auto& el : mapState) {
+        const auto& maxVal = std::get<Index>(tpl);
+
+        const auto it = maxVal.mapColumnNameValue_.find(el.first);
+        if (it != maxVal.mapColumnNameValue_.end()) {
+          std::stringstream ss;
+          ss << it->second;
+          el.second = ss.str();
+        }
+      }
+
+      UpdateMapState<Tuple, Index - 1>(tpl, mapState);
+    }
+  };
+
+  template <typename Tuple>
+  struct UpdateMapState<Tuple, -1> {
+    UpdateMapState(const Tuple&, std::unordered_map<std::string, std::string>&) {}
+  };
+
+  template <typename ...Ts>
+  struct MaxValues : public std::tuple<MaxValue<Ts>...> {
+    constexpr static size_t size = sizeof...(Ts);
+  };
+
+ public:
+  MaxCollector(const std::string& selectQuery, const std::string& maxValueColumnNames, std::unordered_map<std::string, std::string>& mapState)
+    :selectQuery_(selectQuery), maxValueColumnNames_(maxValueColumnNames), mapState_(mapState) {
+  }
+
+  template <typename T>
+  void updateMaxValue(const std::string& columnName, const T& value) {
+    if (mapState_.count(columnName)) {
+      constexpr auto index = TupleIndexByType<T, decltype(maxValues_), -1, false>::index();
+      std::get<index>(maxValues_).updateMaxValue(columnName, value);
+    }
+  }
+
+  bool updateMapState() {
+    auto mapState = mapState_;
+    UpdateMapState<decltype(maxValues_), decltype(maxValues_)::size - 1>(maxValues_, mapState_);
+
+    return mapState != mapState_;
 
 Review comment:
   The fact that we copy the whole map here to see if there were changes while make the code way more verbose to avoid additional lookups/writes in `MaxValue` is contradictory and feels wrong.
   
   Can we either optimize properly for performance if it matters or reduce verbosity if it doesn't?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r379060348
 
 

 ##########
 File path: extensions/sql/data/WriteCallback.h
 ##########
 @@ -0,0 +1,49 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class WriteCallback : public OutputStreamCallback {
+public:
+  WriteCallback(const std::string& data)
 
 Review comment:
   Signature could be WriteCallback(std::string data) and then for optimization caller can call `WriteCallback writer(std::move(output));` and in this particular case it works, but it won't if `output` is used later in the code.
   
   `data` is not changed and because of this `WriteCallback(const std::string& data)` has const reference. We need to remove const because `WriteCallback` uses  `DataStream::writeData(uint8_t *value, int size);` (which should be `const uint8_t* value` BTW), but this is internals of WriteCallback, and it's callers should not know about it.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378129066
 
 

 ##########
 File path: extensions/sql/data/JSONSQLWriter.h
 ##########
 @@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "rapidjson/document.h"
+
+#include "SQLWriter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+class JSONSQLWriter: public SQLWriter {
+ public:
+  JSONSQLWriter(bool pretty);
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378797269
 
 

 ##########
 File path: extensions/sql/data/WriteCallback.h
 ##########
 @@ -0,0 +1,49 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class WriteCallback : public OutputStreamCallback {
+public:
+  WriteCallback(const std::string& data)
 
 Review comment:
   Single parameter constructors should be marked as `explicit` or have a code comment explaining why implicit conversion is desired from the parameter type.
   
   See also my reply to the thread about `const_cast`. If we mutate through the pointer/object, then it shouldn't be `const`. The caller should be responsible for creating a mutable buffer, potentially by casting away `const` from their `str.data()` or any other way.
   
   I also prefer the old signature (`char*` + length) because it only requires mutable access from the caller, whereas `std::string` requires ownership as well, since `std::string` is an owning container.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378784466
 
 

 ##########
 File path: extensions/sql/data/Utils.cpp
 ##########
 @@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Utils.h"
+
+#include <algorithm>
+#include  <cctype>
+#include  <regex>
+#include  <sstream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string toLower(const std::string& str) {
+  std::string ret;
+
+  // (int(*)(int))std::tolower - to avoid compilation error 'no matching overloaded function found'. 
+  // It is described in https://stackoverflow.com/questions/5539249/why-cant-transforms-begin-s-end-s-begin-tolower-be-complied-successfu.
+  std::transform(str.begin(), str.end(), std::back_inserter(ret), (int(*)(int))std::tolower);
 
 Review comment:
   ok, feel free to mark this thread as resolved

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377585714
 
 

 ##########
 File path: extensions/sql/data/SQLRowsetProcessor.h
 ##########
 @@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include <soci/soci.h>
+
+#include "SQLRowSubscriber.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+class SQLRowsetProcessor
+{
+ public:
+  SQLRowsetProcessor(const soci::rowset<soci::row>& rowset, const std::vector<SQLRowSubscriber*>& rowSubscribers);
+
+  size_t process(size_t max = 0);
 
 Review comment:
   The meaning of `max == 0` needs to be documented.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r379072540
 
 

 ##########
 File path: extensions/sql/data/WriteCallback.h
 ##########
 @@ -0,0 +1,49 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class WriteCallback : public OutputStreamCallback {
+public:
+  WriteCallback(const std::string& data)
 
 Review comment:
   I've misunderstood the purpose of the class. Up until now I thought that `stream->write` writes into `data_`, therefore modifies it. I only now realized that it copies _from_ `data_` _to_ the stream, not the other way around, and that the incorrect interface here is `BaseStream::write`, not `std::string::data`.
   
   Sorry for my misunderstanding, feel free to close this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on issue #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
bakaid commented on issue #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#issuecomment-586281562
 
 
   @am-c-p-p  please either fix linter errors or don't enable the linter for the extension.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r379351701
 
 

 ##########
 File path: extensions/sql/processors/ExecuteSQL.cpp
 ##########
 @@ -0,0 +1,123 @@
+/**
+ * @file ExecuteSQL.cpp
+ * ExecuteSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ExecuteSQL.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string ExecuteSQL::ProcessorName("ExecuteSQL");
+
+const core::Property ExecuteSQL::s_sqlSelectQuery(
+  core::PropertyBuilder::createProperty("SQL select query")->isRequired(true)->withDescription(
+    "The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. "
+    "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
+    "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database. "
+    "Note that Expression Language is not evaluated for flow file contents.")->supportsExpressionLanguage(true)->build());
+
+const core::Property ExecuteSQL::s_maxRowsPerFlowFile(
+	core::PropertyBuilder::createProperty("Max Rows Per Flow File")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+		"The maximum number of result rows that will be included intoi a flow file. If zero then all will be placed into the flow file")->supportsExpressionLanguage(true)->build());
+
+const core::Relationship ExecuteSQL::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultRowCount = "executesql.row.count";
+
+ExecuteSQL::ExecuteSQL(const std::string& name, utils::Identifier uuid)
+  : SQLProcessor(name, uuid), max_rows_(0) {
+}
+
+ExecuteSQL::~ExecuteSQL() {
+}
+
+void ExecuteSQL::initialize() {
+  //! Set the supported properties
+  setSupportedProperties( { dbControllerService(), outputFormat(), s_sqlSelectQuery, s_maxRowsPerFlowFile});
+
+  //! Set the supported relationships
+  setSupportedRelationships( { s_success });
+}
+
+void ExecuteSQL::processOnSchedule(const core::ProcessContext &context) {
+  initOutputFormat(context);
+
+  context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
+  context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
+}
+
+void ExecuteSQL::processOnTrigger(core::ProcessSession &session) {
+  auto statement = connection_->prepareStatement(sqlSelectQuery_);
+
+  auto rowset = statement->execute();
+
+  int count = 0;
+  size_t rowCount = 0;
+  sql::JSONSQLWriter sqlWriter(isJSONPretty());
+  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, { &sqlWriter });
+
+  // Process rowset.
+  do {
+    rowCount = sqlRowsetProcessor.process(max_rows_ == 0 ? std::numeric_limits<size_t>::max() : max_rows_);
+    count++;
+    if (rowCount == 0)
+      break;
+
+    const auto& output = sqlWriter.toString();
 
 Review comment:
   Thanks!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on issue #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on issue #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#issuecomment-586126177
 
 
   All comments are addressed, can be merged in apache. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378740016
 
 

 ##########
 File path: extensions/sql/data/WriteCallback.h
 ##########
 @@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class WriteCallback : public OutputStreamCallback {
+public:
+  WriteCallback(const char *data, uint64_t size)
+    : _data(const_cast<char*>(data)),
 
 Review comment:
   WriteCallback called like WriteCallback(out.data(), ...); `out` is string `out.data()` type is `const char*`.
   With WriteCallback(char*, ...) , WriteCallback(out.data(), ...) won't compile because cannot convert from `const char* -> char*`.
   
   But I'll refactor to WriteCallback(const std::string&). 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378741857
 
 

 ##########
 File path: win_build_vs.bat
 ##########
 @@ -77,4 +81,4 @@ goto :eof
 
 :usage
 @echo "Usage: %0 <build_dir> options"
-exit /B 1
+exit /B 1
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378188642
 
 

 ##########
 File path: extensions/sql/services/ODBCConnector.h
 ##########
 @@ -0,0 +1,127 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once 
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+
+#include "DatabaseService.h"
+#include "core/Resource.h"
+#include "data/DatabaseConnectors.h"
+#include <memory>
+#include <unordered_map>
+
+#include <soci/soci.h>
+#include <soci/odbc/soci-odbc.h>
+
+#include <iostream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+namespace controllers {
+
+class ODBCConnection : public sql::Connection {
+ public:
+  explicit ODBCConnection(const std::string& connectionString)
+    : connection_string_(connectionString) {
+      session_ = std::make_unique<soci::session>(getSessionParameters());
+  }
+
+  virtual ~ODBCConnection() {
+  }
+
+  bool connected(std::string& exception) const override {
+    try {
+      exception.clear();
+      // According to https://stackoverflow.com/questions/3668506/efficient-sql-test-query-or-validation-query-that-will-work-across-all-or-most by Rob Hruska, 
+      // 'select 1' works for: H2, MySQL, Microsoft SQL Server, PostgreSQL, SQLite. For Orcale 'SELECT 1 FROM DUAL' works.
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378130935
 
 

 ##########
 File path: extensions/sql/services/ODBCConnector.h
 ##########
 @@ -0,0 +1,127 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once 
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+
+#include "DatabaseService.h"
+#include "core/Resource.h"
+#include "data/DatabaseConnectors.h"
+#include <memory>
+#include <unordered_map>
+
+#include <soci/soci.h>
+#include <soci/odbc/soci-odbc.h>
+
+#include <iostream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+namespace controllers {
+
+class ODBCConnection : public sql::Connection {
+ public:
+  explicit ODBCConnection(const std::string& connectionString)
+    : connection_string_(connectionString) {
+      session_ = std::make_unique<soci::session>(getSessionParameters());
+  }
+
+  virtual ~ODBCConnection() {
+  }
+
+  bool connected(std::string& exception) const override {
+    try {
+      exception.clear();
+      // According to https://stackoverflow.com/questions/3668506/efficient-sql-test-query-or-validation-query-that-will-work-across-all-or-most by Rob Hruska, 
+      // 'select 1' works for: H2, MySQL, Microsoft SQL Server, PostgreSQL, SQLite. For Orcale 'SELECT 1 FROM DUAL' works.
+      prepareStatement("select 1")->execute();
+      return true;
+    } catch (std::exception& e) {
+      exception = e.what();
+      return false;
+    }
+  }
+
+  std::unique_ptr<sql::Statement> prepareStatement(const std::string& query) const override {
+    return std::make_unique<sql::Statement>(session_, query);
+  }
+
+  std::unique_ptr<Session> getSession() const override {
+    return std::make_unique<sql::Session>(session_);
+  }
+
+ private:
+   const soci::connection_parameters getSessionParameters() const {
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid closed pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
bakaid closed pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377596564
 
 

 ##########
 File path: extensions/sql/services/DatabaseService.h
 ##########
 @@ -0,0 +1,116 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_DATABASESERVICE_H_
+#define LIBMINIFI_INCLUDE_CONTROLLERS_DATABASESERVICE_H_
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+#include "data/DatabaseConnectors.h"
+#include <memory>
+#include <unordered_map>
+
+#include <soci/soci.h>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+namespace controllers {
+
+/**
+ * Purpose and Justification: Controller services function as a layerable way to provide
+ * services to internal services. While a controller service is generally configured from the flow,
+ * we want to follow the open closed principle and provide Database services
+ */
+class DatabaseService : public core::controller::ControllerService {
+ public:
+
+  /**
+   * Constructors for the controller service.
+   */
+  explicit DatabaseService(const std::string &name, const std::string &id)
+      : ControllerService(name, id),
+        initialized_(false),
+        logger_(logging::LoggerFactory<DatabaseService>::getLogger()) {
+    initialize();
+  }
+
+  explicit DatabaseService(const std::string &name, utils::Identifier uuid = utils::Identifier())
+      : ControllerService(name, uuid),
+        initialized_(false),
+        logger_(logging::LoggerFactory<DatabaseService>::getLogger()) {
+    initialize();
+  }
+
+  explicit DatabaseService(const std::string &name, const std::shared_ptr<Configure> &configuration)
+      : ControllerService(name),
+        initialized_(false),
+        logger_(logging::LoggerFactory<DatabaseService>::getLogger()) {
+    setConfiguration(configuration);
+    initialize();
+  }
+
+  /**
+   * Parameters needed.
+   */
+  static core::Property ConnectionString;
+
+  virtual void initialize() override;
+
+  void yield() override {
+
+  }
+
+  bool isRunning() override {
+    return getState() == core::controller::ControllerServiceState::ENABLED;
+  }
+
+  bool isWorkAvailable() override {
+    return false;
+  }
+
+  virtual void onEnable() override;
+
+  virtual std::unique_ptr<sql::Connection> getConnection() const = 0;
+
+ protected:
+
+  void initializeProperties();
+
+  // initialization mutex.
+  std::recursive_mutex initialization_mutex_;
 
 Review comment:
   `recursive_mutex` is a smell. Please use `mutex` in new code and design your classes appropriately!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377647387
 
 

 ##########
 File path: extensions/sql/processors/QueryDatabaseTable.cpp
 ##########
 @@ -0,0 +1,475 @@
+/**
+ * @file QueryDatabaseTable.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueryDatabaseTable.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+#include <algorithm>
+#include <regex>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+#include "data/MaxCollector.h"
+#include "data/Utils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
+
+const core::Property QueryDatabaseTable::s_tableName(
+  core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_columnNames(
+  core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
+    "A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
+    "If no column names are supplied, all columns in the specified table will be returned. "
+    "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxValueColumnNames(
+  core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
+    "A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
+    "Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
+    "Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
+    "This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. "
+    "Note that some ODBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. "
+    "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
+    "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
+    "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
+    supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_whereClause(
+  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
+    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_sqlQuery(
+  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
+    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
+    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_stateDirectory(
+  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+
+const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+
+const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultTableName = "tablename";
+static const std::string ResultRowCount = "querydbtable.row.count";
+
+// State
+class State {
+ public:
+  State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+    :tableName_(tableName), logger_(logger) {
+    if (!createUUIDDir(stateDir, uuid, filePath_))
+      return;
+
+    filePath_ += "State.txt";
+
+    if (!getStateFromFile())
+      return;
+
+    ok_ = true;
+  }
+
+  ~State() {
+    if (file_.is_open()) {
+      file_.close();
+    }
 
 Review comment:
   This is redundant as `fstream`'s dtor already closes files.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378213435
 
 

 ##########
 File path: win_build_vs.bat
 ##########
 @@ -77,4 +81,4 @@ goto :eof
 
 :usage
 @echo "Usage: %0 <build_dir> options"
-exit /B 1
+exit /B 1
 
 Review comment:
   No newline at the end of the file

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377577407
 
 

 ##########
 File path: extensions/sql/data/JSONSQLWriter.h
 ##########
 @@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "rapidjson/document.h"
+
+#include "SQLWriter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+class JSONSQLWriter: public SQLWriter {
+ public:
+  JSONSQLWriter(bool pretty);
 
 Review comment:
   Single parameter constructors should either be marked `explicit` or have a comment explaining why implicit conversion from the parameter type is intended.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r379316720
 
 

 ##########
 File path: extensions/sql/processors/ExecuteSQL.cpp
 ##########
 @@ -0,0 +1,123 @@
+/**
+ * @file ExecuteSQL.cpp
+ * ExecuteSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ExecuteSQL.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string ExecuteSQL::ProcessorName("ExecuteSQL");
+
+const core::Property ExecuteSQL::s_sqlSelectQuery(
+  core::PropertyBuilder::createProperty("SQL select query")->isRequired(true)->withDescription(
+    "The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. "
+    "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
+    "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database. "
+    "Note that Expression Language is not evaluated for flow file contents.")->supportsExpressionLanguage(true)->build());
+
+const core::Property ExecuteSQL::s_maxRowsPerFlowFile(
+	core::PropertyBuilder::createProperty("Max Rows Per Flow File")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+		"The maximum number of result rows that will be included intoi a flow file. If zero then all will be placed into the flow file")->supportsExpressionLanguage(true)->build());
+
+const core::Relationship ExecuteSQL::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultRowCount = "executesql.row.count";
+
+ExecuteSQL::ExecuteSQL(const std::string& name, utils::Identifier uuid)
+  : SQLProcessor(name, uuid), max_rows_(0) {
+}
+
+ExecuteSQL::~ExecuteSQL() {
+}
+
+void ExecuteSQL::initialize() {
+  //! Set the supported properties
+  setSupportedProperties( { dbControllerService(), outputFormat(), s_sqlSelectQuery, s_maxRowsPerFlowFile});
+
+  //! Set the supported relationships
+  setSupportedRelationships( { s_success });
+}
+
+void ExecuteSQL::processOnSchedule(const core::ProcessContext &context) {
+  initOutputFormat(context);
+
+  context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
+  context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
+}
+
+void ExecuteSQL::processOnTrigger(core::ProcessSession &session) {
+  auto statement = connection_->prepareStatement(sqlSelectQuery_);
+
+  auto rowset = statement->execute();
+
+  int count = 0;
+  size_t rowCount = 0;
+  sql::JSONSQLWriter sqlWriter(isJSONPretty());
+  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, { &sqlWriter });
+
+  // Process rowset.
+  do {
+    rowCount = sqlRowsetProcessor.process(max_rows_ == 0 ? std::numeric_limits<size_t>::max() : max_rows_);
+    count++;
+    if (rowCount == 0)
+      break;
+
+    const auto& output = sqlWriter.toString();
 
 Review comment:
   `SQLWriter::toString` returns an `std::string`, not `const std::string&`. Assigning this temporary to a const& will work because of reference lifetime extension, but it has no benefits, and it is misleading.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r379072540
 
 

 ##########
 File path: extensions/sql/data/WriteCallback.h
 ##########
 @@ -0,0 +1,49 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class WriteCallback : public OutputStreamCallback {
+public:
+  WriteCallback(const std::string& data)
 
 Review comment:
   I've misunderstood the purpose of the class. Up until now I thought that `stream->write` writes into `data_`, therefore modifies it. I only now realized that it copies _from_ `data_` _to_ the stream, not the other way around, and that the incorrect interface here is `BaseStream::write`, not `std::string::data`.
   
   Sorry for my misunderstanding.
   
   edit: It should still have an `explicit` ctor.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377599401
 
 

 ##########
 File path: win_build_vs.bat
 ##########
 @@ -22,10 +22,12 @@ if [%1]==[] goto usage
 set builddir=%1
 set skiptests=OFF
 set cmake_build_type=Release
+set build_type=Release
 
 Review comment:
   Now that we have `built_type`, what do we use `cmake_build_type` for? Was this introduced with the rebase? Because I remember @bakaid removing something like that not long ago.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378188464
 
 

 ##########
 File path: extensions/sql/SQLLoader.h
 ##########
 @@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSION_SQLLOADER_H
+#define EXTENSION_SQLLOADER_H
+
+#include "core/ClassLoader.h"
+#include "processors/ExecuteSQL.h"
+#include "processors/PutSQL.h"
+#include "processors/QueryDatabaseTable.h"
+#include "services/ODBCConnector.h"
+
+class SQLFactory : public core::ObjectFactory {
+ public:
+  SQLFactory() {
+
+  }
+
+  /**
+   * Gets the name of the object.
+   * @return class name of processor
+   */
+  virtual std::string getName() override {
+    return "SQLFactory";
+  }
+
+  virtual std::string getClassName() override{
+    return "SQLFactory";
+  }
+  /**
+   * Gets the class name for the object
+   * @return class name for the processor.
+   */
+  virtual std::vector<std::string> getClassNames() override{
+    std::vector<std::string> class_names = {"ExecuteSQL", "PutSQL", "QueryDatabaseTable", "ODBCService"};
+    return class_names;
+  }
+
+  template <typename T>
+  static std::unique_ptr<ObjectFactory> getObjectFactory() {
+    return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<T>());
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377573687
 
 

 ##########
 File path: extensions/sql/SQLLoader.h
 ##########
 @@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSION_SQLLOADER_H
+#define EXTENSION_SQLLOADER_H
+
+#include "core/ClassLoader.h"
+#include "processors/ExecuteSQL.h"
+#include "processors/PutSQL.h"
+#include "processors/QueryDatabaseTable.h"
+#include "services/ODBCConnector.h"
+
+class SQLFactory : public core::ObjectFactory {
+ public:
+  SQLFactory() {
+
+  }
 
 Review comment:
   Too much vertical space reduces code readability. I suggest `SQLFactory() = default;`.
   
   There are more occurrences of more vertical space used than I prefer, but since this is a question of taste, I'll leave it up to you. In this case using 3 lines for something that's logically only 1 went over my threshold of commenting.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378603638
 
 

 ##########
 File path: extensions/sql/processors/QueryDatabaseTable.cpp
 ##########
 @@ -0,0 +1,475 @@
+/**
+ * @file QueryDatabaseTable.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueryDatabaseTable.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+#include <algorithm>
+#include <regex>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+#include "data/MaxCollector.h"
+#include "data/Utils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
+
+const core::Property QueryDatabaseTable::s_tableName(
+  core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_columnNames(
+  core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
+    "A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
+    "If no column names are supplied, all columns in the specified table will be returned. "
+    "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxValueColumnNames(
+  core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
+    "A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
+    "Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
+    "Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
+    "This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. "
+    "Note that some ODBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. "
+    "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
+    "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
+    "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
+    supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_whereClause(
+  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
+    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_sqlQuery(
+  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
+    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
+    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_stateDirectory(
+  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+
+const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+
+const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultTableName = "tablename";
+static const std::string ResultRowCount = "querydbtable.row.count";
+
+// State
+class State {
+ public:
+  State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+    :tableName_(tableName), logger_(logger) {
+    if (!createUUIDDir(stateDir, uuid, filePath_))
+      return;
+
+    filePath_ += "State.txt";
+
+    if (!getStateFromFile())
+      return;
+
+    ok_ = true;
+  }
+
+  ~State() {
+    if (file_.is_open()) {
+      file_.close();
+    }
+  }
+
+  operator bool() const {
+    return ok_;
+  }
+
+  std::unordered_map<std::string, std::string> mapState() const {
+    return mapState_;
+  }
+
+  void writeStateToFile(const std::unordered_map<std::string, std::string>& mapState) {
+    file_.seekp(std::ios::beg);
+
+    file_ << tableName_ << separator();
+    auto dataSize = tableName_.size() + separator().size();
+
+    for (const auto& el : mapState) {
+      file_ << el.first << '=' << el.second << separator();
+      dataSize += el.first.size() + 1 + el.second.size() + separator().size();
+    }
+
+    // If dataSize_ > dataSize, then clear difference with ' '.
+    if (dataSize_ > dataSize) {
+      for (auto i = dataSize_ - dataSize; i > 0; i--) {
+        file_ << ' ';
+      }
+    }
 
 Review comment:
   `if` cannot be removed because of `dataSize_` and `dataSize` have unsigned types and if `dataSize_` < `dataSize`, `i` will be very large positive number.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377575687
 
 

 ##########
 File path: extensions/sql/data/DatabaseConnectors.h
 ##########
 @@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef EXTENSIONS_SQL_SERVICES_DATABASECONNECTORS_H_
+#define EXTENSIONS_SQL_SERVICES_DATABASECONNECTORS_H_
+
+#include <memory>
+#include <iostream>
+#include <algorithm>
+#include <cctype>
 
 Review comment:
   I don't see anything from these 3 headers being used in this header. We are missing `<string>`, though.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378742650
 
 

 ##########
 File path: extensions/sql/data/DatabaseConnectors.h
 ##########
 @@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef EXTENSIONS_SQL_SERVICES_DATABASECONNECTORS_H_
+#define EXTENSIONS_SQL_SERVICES_DATABASECONNECTORS_H_
+
+#include <memory>
+#include <iostream>
+#include <algorithm>
+#include <cctype>
+
+#include <soci/soci.h>
+
+#include "Utils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+/**
+ * We do not intend to create an abstract facade here. We know that SOCI is the underlying
+ * SQL library. We only wish to abstract ODBC specific information
+ */
+
+class Statement {
+ public:
+
+  explicit Statement(const std::unique_ptr<soci::session>& session, const std::string &query)
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378788038
 
 

 ##########
 File path: extensions/sql/data/WriteCallback.h
 ##########
 @@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class WriteCallback : public OutputStreamCallback {
+public:
+  WriteCallback(const char *data, uint64_t size)
+    : _data(const_cast<char*>(data)),
 
 Review comment:
   `WriteCallback` should take a reference/pointer to mutable storage, because it mutates the storage. Please work around the missing `string::data` overload where the call occurs, so that we don't need to make `WriteCallback` lie on its interface.
   
   I suggest `const_cast` directly on the result of `.data()` or substitute `out.data()` with `&out[0]` (only if `out.size() > 0`).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377646615
 
 

 ##########
 File path: extensions/sql/processors/QueryDatabaseTable.cpp
 ##########
 @@ -0,0 +1,475 @@
+/**
+ * @file QueryDatabaseTable.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueryDatabaseTable.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+#include <algorithm>
+#include <regex>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+#include "data/MaxCollector.h"
+#include "data/Utils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
+
+const core::Property QueryDatabaseTable::s_tableName(
+  core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_columnNames(
+  core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
+    "A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
+    "If no column names are supplied, all columns in the specified table will be returned. "
+    "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxValueColumnNames(
+  core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
+    "A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
+    "Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
+    "Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
+    "This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. "
+    "Note that some ODBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. "
+    "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
+    "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
+    "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
+    supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_whereClause(
+  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
+    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_sqlQuery(
+  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
+    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
+    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_stateDirectory(
+  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+
+const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+
+const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultTableName = "tablename";
+static const std::string ResultRowCount = "querydbtable.row.count";
+
+// State
+class State {
+ public:
+  State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+    :tableName_(tableName), logger_(logger) {
+    if (!createUUIDDir(stateDir, uuid, filePath_))
+      return;
+
+    filePath_ += "State.txt";
+
+    if (!getStateFromFile())
+      return;
+
+    ok_ = true;
+  }
+
+  ~State() {
+    if (file_.is_open()) {
+      file_.close();
+    }
+  }
+
+  operator bool() const {
+    return ok_;
+  }
+
+  std::unordered_map<std::string, std::string> mapState() const {
+    return mapState_;
+  }
+
+  void writeStateToFile(const std::unordered_map<std::string, std::string>& mapState) {
+    file_.seekp(std::ios::beg);
+
+    file_ << tableName_ << separator();
+    auto dataSize = tableName_.size() + separator().size();
+
+    for (const auto& el : mapState) {
+      file_ << el.first << '=' << el.second << separator();
+      dataSize += el.first.size() + 1 + el.second.size() + separator().size();
+    }
+
+    // If dataSize_ > dataSize, then clear difference with ' '.
+    if (dataSize_ > dataSize) {
+      for (auto i = dataSize_ - dataSize; i > 0; i--) {
+        file_ << ' ';
+      }
+    }
 
 Review comment:
   `if` is redundant, the `for` condition will never be true if `dataSize_ <= dataSize`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378203621
 
 

 ##########
 File path: extensions/sql/processors/QueryDatabaseTable.cpp
 ##########
 @@ -0,0 +1,475 @@
+/**
+ * @file QueryDatabaseTable.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueryDatabaseTable.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+#include <algorithm>
+#include <regex>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+#include "data/MaxCollector.h"
+#include "data/Utils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
+
+const core::Property QueryDatabaseTable::s_tableName(
+  core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_columnNames(
+  core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
+    "A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
+    "If no column names are supplied, all columns in the specified table will be returned. "
+    "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxValueColumnNames(
+  core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
+    "A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
+    "Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
+    "Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
+    "This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. "
+    "Note that some ODBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. "
+    "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
+    "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
+    "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
+    supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_whereClause(
+  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
+    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_sqlQuery(
+  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
+    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
+    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_stateDirectory(
+  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+
+const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+
+const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultTableName = "tablename";
+static const std::string ResultRowCount = "querydbtable.row.count";
+
+// State
+class State {
+ public:
+  State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+    :tableName_(tableName), logger_(logger) {
+    if (!createUUIDDir(stateDir, uuid, filePath_))
+      return;
+
+    filePath_ += "State.txt";
+
+    if (!getStateFromFile())
+      return;
+
+    ok_ = true;
+  }
+
+  ~State() {
+    if (file_.is_open()) {
+      file_.close();
+    }
+  }
+
+  operator bool() const {
+    return ok_;
+  }
+
+  std::unordered_map<std::string, std::string> mapState() const {
+    return mapState_;
+  }
+
+  void writeStateToFile(const std::unordered_map<std::string, std::string>& mapState) {
+    file_.seekp(std::ios::beg);
+
+    file_ << tableName_ << separator();
+    auto dataSize = tableName_.size() + separator().size();
+
+    for (const auto& el : mapState) {
+      file_ << el.first << '=' << el.second << separator();
+      dataSize += el.first.size() + 1 + el.second.size() + separator().size();
+    }
+
+    // If dataSize_ > dataSize, then clear difference with ' '.
+    if (dataSize_ > dataSize) {
+      for (auto i = dataSize_ - dataSize; i > 0; i--) {
+        file_ << ' ';
+      }
+    }
 
 Review comment:
   Your previous reply could be a great code comment explaining why the loop is necessary, but if I understand correctly just unwrapping the `if` body and leaving the `for` loop only would have equivalent behavior with 2 less lines and 1 less indentation level. (i.e. less complexity)
   
   Correct me if I'm wrong, but I suggest this in place of lines 146-151:
   ```
   // If a maxValueColumnName type is varchar then a new max value can be shorter than previous max value,
   // and due to ???, we need to pad the data in the state file
   for (auto i = dataSize_ - dataSize; i > 0; i--) {
     file_ << ' ';
   }
   ```
   
   p.s.: I'm not insistent on the code change if you prefer to have the `if`, but I'm insistent on improving the explanatory comment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377587225
 
 

 ##########
 File path: extensions/sql/data/Utils.cpp
 ##########
 @@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Utils.h"
+
+#include <algorithm>
+#include  <cctype>
+#include  <regex>
+#include  <sstream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string toLower(const std::string& str) {
+  std::string ret;
+
+  // (int(*)(int))std::tolower - to avoid compilation error 'no matching overloaded function found'. 
+  // It is described in https://stackoverflow.com/questions/5539249/why-cant-transforms-begin-s-end-s-begin-tolower-be-complied-successfu.
+  std::transform(str.begin(), str.end(), std::back_inserter(ret), (int(*)(int))std::tolower);
 
 Review comment:
   Taking `str` by value and using `transform` in-place would avoid additional allocations and make it possible for users of the function to move in their long strings if they only need the lowercase version.
   
   I think a lambda that selects the appropriate overload looks nicer than a function pointer cast, but that's subjective.
   
   I like the fact that you used STL to accomplish this. Generic algorithms are beautiful and underused IMO so I appreciate every problem solved in terms of existing generic algorithms.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378788038
 
 

 ##########
 File path: extensions/sql/data/WriteCallback.h
 ##########
 @@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class WriteCallback : public OutputStreamCallback {
+public:
+  WriteCallback(const char *data, uint64_t size)
+    : _data(const_cast<char*>(data)),
 
 Review comment:
   In that case I'd prefer to have the `const_cast` near the problematic `.data()` call so that it can be deferred from the context that the reason for casting away const is the missing non-`const` `string::data` overload. Alternatively you can pass `&out[0]` if `out.size() > 0`.
   
   I'm asking for the workaround to be moved closer to the problem it works around so that we can avoid affecting the interface of unrelated code (`WriteCallback` in this case).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377630872
 
 

 ##########
 File path: extensions/sql/processors/SQLProcessor.h
 ##########
 @@ -0,0 +1,105 @@
+/**
+ * @file SQLProcessor.h
+ * SQLProcessor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "core/Core.h"
+#include "FlowFileRecord.h"
+#include "concurrentqueue.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+template <typename T>
+class SQLProcessor: public core::Processor {
+ protected:
+  SQLProcessor(const std::string& name, utils::Identifier uuid)
+    : core::Processor(name, uuid), logger_(logging::LoggerFactory<T>::getLogger()) {
+  }
+
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override {
+    std::string controllerService;
+    context->getProperty(dbControllerService().getName(), controllerService);
+
+    dbService_ = std::dynamic_pointer_cast<sql::controllers::DatabaseService>(context->getControllerService(controllerService));
+    if (!dbService_)
+      throw minifi::Exception(PROCESSOR_EXCEPTION, "'DB Controller Service' must be defined");
+
+    static_cast<T*>(this)->processOnSchedule(context, sessionFactory);
+  }
+
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override {
+    std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+    if (!lock.owns_lock()) {
+      logger_->log_warn("'onTrigger' is called before previous 'onTrigger' call is finished.");
+      context->yield();
+      return;
+    }
+
+    try {
+      if (!connection_) {
+        connection_ = dbService_->getConnection();
+      }
+      static_cast<T*>(this)->processOnTrigger(context, session);
 
 Review comment:
   I don't think we need CRTP to avoid the overhead of a virtual function call, given that we abuse shared_ptr all over our codebase and nobody bats an eye.
   
   I suggest making `processOnTrigger` and `processOnSchedule` pure virtual in `SQLProcessor`, `final` in the implementations and not use CRTP. This will also prevent accidental instantiation of `SQLProcessor`, and it seems to me that it was not meant to be used directly, so that's positive in this case.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377606863
 
 

 ##########
 File path: extensions/sql/data/DatabaseConnectors.h
 ##########
 @@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef EXTENSIONS_SQL_SERVICES_DATABASECONNECTORS_H_
+#define EXTENSIONS_SQL_SERVICES_DATABASECONNECTORS_H_
+
+#include <memory>
+#include <iostream>
+#include <algorithm>
+#include <cctype>
+
+#include <soci/soci.h>
+
+#include "Utils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+/**
+ * We do not intend to create an abstract facade here. We know that SOCI is the underlying
+ * SQL library. We only wish to abstract ODBC specific information
+ */
+
+class Statement {
+ public:
+
+  explicit Statement(const std::unique_ptr<soci::session>& session, const std::string &query)
 
 Review comment:
   If we need a reference to `soci::session`, then we should take a `soci::session&`, not a `const std::unique_ptr<soci::session>&`. We expect `session` to be non-null and there's no reason to enforce unique ownership on the caller, an observer reference/pointer should be enough.
   
   I didn't create multiple review comments for all the occurrences of this same issue. Same with shared_ptr if shared ownership is not desired.
   Some I noticed:
   - `Session::Session`
   - `PutSQL::processOnSchedule`
   - `PutSQL::processOnTrigger`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378095180
 
 

 ##########
 File path: extensions/sql/processors/QueryDatabaseTable.cpp
 ##########
 @@ -0,0 +1,475 @@
+/**
+ * @file QueryDatabaseTable.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueryDatabaseTable.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+#include <algorithm>
+#include <regex>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+#include "data/MaxCollector.h"
+#include "data/Utils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
+
+const core::Property QueryDatabaseTable::s_tableName(
+  core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_columnNames(
+  core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
+    "A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
+    "If no column names are supplied, all columns in the specified table will be returned. "
+    "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxValueColumnNames(
+  core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
+    "A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
+    "Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
+    "Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
+    "This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. "
+    "Note that some ODBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. "
+    "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
+    "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
+    "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
+    supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_whereClause(
+  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
+    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_sqlQuery(
+  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
+    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
+    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_stateDirectory(
+  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+
+const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+
+const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultTableName = "tablename";
+static const std::string ResultRowCount = "querydbtable.row.count";
+
+// State
+class State {
+ public:
+  State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+    :tableName_(tableName), logger_(logger) {
+    if (!createUUIDDir(stateDir, uuid, filePath_))
+      return;
+
+    filePath_ += "State.txt";
+
+    if (!getStateFromFile())
+      return;
+
+    ok_ = true;
+  }
+
+  ~State() {
+    if (file_.is_open()) {
+      file_.close();
+    }
+  }
+
+  operator bool() const {
+    return ok_;
+  }
+
+  std::unordered_map<std::string, std::string> mapState() const {
+    return mapState_;
+  }
+
+  void writeStateToFile(const std::unordered_map<std::string, std::string>& mapState) {
+    file_.seekp(std::ios::beg);
+
+    file_ << tableName_ << separator();
+    auto dataSize = tableName_.size() + separator().size();
+
+    for (const auto& el : mapState) {
+      file_ << el.first << '=' << el.second << separator();
+      dataSize += el.first.size() + 1 + el.second.size() + separator().size();
+    }
+
+    // If dataSize_ > dataSize, then clear difference with ' '.
+    if (dataSize_ > dataSize) {
+      for (auto i = dataSize_ - dataSize; i > 0; i--) {
+        file_ << ' ';
+      }
+    }
 
 Review comment:
   If a `maxValueColumnName` type is `varchar` then a new max value can be shorter than previous max value.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377692105
 
 

 ##########
 File path: extensions/sql/processors/SQLProcessor.h
 ##########
 @@ -0,0 +1,105 @@
+/**
+ * @file SQLProcessor.h
+ * SQLProcessor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "core/Core.h"
+#include "FlowFileRecord.h"
+#include "concurrentqueue.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+template <typename T>
+class SQLProcessor: public core::Processor {
+ protected:
+  SQLProcessor(const std::string& name, utils::Identifier uuid)
+    : core::Processor(name, uuid), logger_(logging::LoggerFactory<T>::getLogger()) {
+  }
+
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override {
+    std::string controllerService;
+    context->getProperty(dbControllerService().getName(), controllerService);
+
+    dbService_ = std::dynamic_pointer_cast<sql::controllers::DatabaseService>(context->getControllerService(controllerService));
+    if (!dbService_)
+      throw minifi::Exception(PROCESSOR_EXCEPTION, "'DB Controller Service' must be defined");
+
+    static_cast<T*>(this)->processOnSchedule(context, sessionFactory);
+  }
+
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override {
+    std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+    if (!lock.owns_lock()) {
+      logger_->log_warn("'onTrigger' is called before previous 'onTrigger' call is finished.");
+      context->yield();
+      return;
+    }
+
+    try {
+      if (!connection_) {
+        connection_ = dbService_->getConnection();
+      }
+      static_cast<T*>(this)->processOnTrigger(context, session);
 
 Review comment:
   I'm also not in favour of CRTP in this case, but it doesn't hurt. 
   I think rewriting this part of the code would have a very little gain in readability and a lot of time spent on it, so let's leave it as is for now. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r379345046
 
 

 ##########
 File path: extensions/sql/processors/ExecuteSQL.cpp
 ##########
 @@ -0,0 +1,123 @@
+/**
+ * @file ExecuteSQL.cpp
+ * ExecuteSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ExecuteSQL.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string ExecuteSQL::ProcessorName("ExecuteSQL");
+
+const core::Property ExecuteSQL::s_sqlSelectQuery(
+  core::PropertyBuilder::createProperty("SQL select query")->isRequired(true)->withDescription(
+    "The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. "
+    "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
+    "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database. "
+    "Note that Expression Language is not evaluated for flow file contents.")->supportsExpressionLanguage(true)->build());
+
+const core::Property ExecuteSQL::s_maxRowsPerFlowFile(
+	core::PropertyBuilder::createProperty("Max Rows Per Flow File")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+		"The maximum number of result rows that will be included intoi a flow file. If zero then all will be placed into the flow file")->supportsExpressionLanguage(true)->build());
+
+const core::Relationship ExecuteSQL::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultRowCount = "executesql.row.count";
+
+ExecuteSQL::ExecuteSQL(const std::string& name, utils::Identifier uuid)
+  : SQLProcessor(name, uuid), max_rows_(0) {
+}
+
+ExecuteSQL::~ExecuteSQL() {
+}
+
+void ExecuteSQL::initialize() {
+  //! Set the supported properties
+  setSupportedProperties( { dbControllerService(), outputFormat(), s_sqlSelectQuery, s_maxRowsPerFlowFile});
+
+  //! Set the supported relationships
+  setSupportedRelationships( { s_success });
+}
+
+void ExecuteSQL::processOnSchedule(const core::ProcessContext &context) {
+  initOutputFormat(context);
+
+  context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
+  context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
+}
+
+void ExecuteSQL::processOnTrigger(core::ProcessSession &session) {
+  auto statement = connection_->prepareStatement(sqlSelectQuery_);
+
+  auto rowset = statement->execute();
+
+  int count = 0;
+  size_t rowCount = 0;
+  sql::JSONSQLWriter sqlWriter(isJSONPretty());
+  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, { &sqlWriter });
+
+  // Process rowset.
+  do {
+    rowCount = sqlRowsetProcessor.process(max_rows_ == 0 ? std::numeric_limits<size_t>::max() : max_rows_);
+    count++;
+    if (rowCount == 0)
+      break;
+
+    const auto& output = sqlWriter.toString();
 
 Review comment:
   It is an alternative to `move` copy constructor and can be used without knowing if returned object has it or not.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on issue #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
bakaid commented on issue #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#issuecomment-586152586
 
 
   @am-c-p-p will review the fixes and run a (hopefully) final verification on all platforms. If it's OK, I'll merge it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378788038
 
 

 ##########
 File path: extensions/sql/data/WriteCallback.h
 ##########
 @@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class WriteCallback : public OutputStreamCallback {
+public:
+  WriteCallback(const char *data, uint64_t size)
+    : _data(const_cast<char*>(data)),
 
 Review comment:
   Please keep `WriteCallback` taking a reference/pointer to mutable storage, because it mutates the storage, and work around the missing overload where the call occurs.
   
   I suggest `const_cast` directly on the result of `.data()` or substitute `out.data()` with `&out[0]` (only if `out.size() > 0`).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on issue #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on issue #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#issuecomment-586674093
 
 
   @bakaid linter is removed now from sql extension.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377593640
 
 

 ##########
 File path: extensions/sql/processors/QueryDatabaseTable.cpp
 ##########
 @@ -0,0 +1,475 @@
+/**
+ * @file QueryDatabaseTable.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueryDatabaseTable.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+#include <algorithm>
+#include <regex>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+#include "data/MaxCollector.h"
+#include "data/Utils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
+
+const core::Property QueryDatabaseTable::s_tableName(
+  core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_columnNames(
+  core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
+    "A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
+    "If no column names are supplied, all columns in the specified table will be returned. "
+    "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxValueColumnNames(
+  core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
+    "A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
+    "Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
+    "Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
+    "This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. "
+    "Note that some ODBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. "
+    "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
+    "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
+    "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
+    supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_whereClause(
+  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
+    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_sqlQuery(
+  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
+    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
+    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_stateDirectory(
+  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+
+const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+
+const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultTableName = "tablename";
+static const std::string ResultRowCount = "querydbtable.row.count";
+
+// State
+class State {
+ public:
+  State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+    :tableName_(tableName), logger_(logger) {
+    if (!createUUIDDir(stateDir, uuid, filePath_))
+      return;
+
+    filePath_ += "State.txt";
+
+    if (!getStateFromFile())
+      return;
+
+    ok_ = true;
+  }
+
+  ~State() {
+    if (file_.is_open()) {
+      file_.close();
+    }
+  }
+
+  operator bool() const {
+    return ok_;
+  }
 
 Review comment:
   Implicit conversions subtly rot the codebase. Do we _really_ need this to be implicit? If so, please add a code comment with your reasoning. If not, please mark `explicit`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378784466
 
 

 ##########
 File path: extensions/sql/data/Utils.cpp
 ##########
 @@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Utils.h"
+
+#include <algorithm>
+#include  <cctype>
+#include  <regex>
+#include  <sstream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string toLower(const std::string& str) {
+  std::string ret;
+
+  // (int(*)(int))std::tolower - to avoid compilation error 'no matching overloaded function found'. 
+  // It is described in https://stackoverflow.com/questions/5539249/why-cant-transforms-begin-s-end-s-begin-tolower-be-complied-successfu.
+  std::transform(str.begin(), str.end(), std::back_inserter(ret), (int(*)(int))std::tolower);
 
 Review comment:
   ok, feel free to mark this thread as resolved

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378128845
 
 

 ##########
 File path: extensions/sql/SQLLoader.h
 ##########
 @@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSION_SQLLOADER_H
+#define EXTENSION_SQLLOADER_H
+
+#include "core/ClassLoader.h"
+#include "processors/ExecuteSQL.h"
+#include "processors/PutSQL.h"
+#include "processors/QueryDatabaseTable.h"
+#include "services/ODBCConnector.h"
+
+class SQLFactory : public core::ObjectFactory {
+ public:
+  SQLFactory() {
+
+  }
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378208898
 
 

 ##########
 File path: extensions/sql/data/WriteCallback.h
 ##########
 @@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class WriteCallback : public OutputStreamCallback {
+public:
+  WriteCallback(const char *data, uint64_t size)
+    : _data(const_cast<char*>(data)),
 
 Review comment:
   If there a reason for taking `const char*` and casting away `const` instead of taking mutable `char*`? If so, please add code comment, otherwise please fix it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377573255
 
 

 ##########
 File path: extensions/sql/SQLLoader.h
 ##########
 @@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSION_SQLLOADER_H
+#define EXTENSION_SQLLOADER_H
+
+#include "core/ClassLoader.h"
+#include "processors/ExecuteSQL.h"
+#include "processors/PutSQL.h"
+#include "processors/QueryDatabaseTable.h"
+#include "services/ODBCConnector.h"
+
+class SQLFactory : public core::ObjectFactory {
+ public:
+  SQLFactory() {
+
+  }
+
+  /**
+   * Gets the name of the object.
+   * @return class name of processor
+   */
+  virtual std::string getName() override {
+    return "SQLFactory";
+  }
+
+  virtual std::string getClassName() override{
+    return "SQLFactory";
+  }
+  /**
+   * Gets the class name for the object
+   * @return class name for the processor.
+   */
+  virtual std::vector<std::string> getClassNames() override{
 
 Review comment:
   I'm in favor of following "Virtual functions should specify exactly one of `virtual`, `override`, or `final`" from the C++ Core Guidelines.
   https://github.com/isocpp/CppCoreGuidelines/blob/master/CppCoreGuidelines.md#Rh-override

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377584537
 
 

 ##########
 File path: extensions/sql/data/MaxCollector.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <tuple>
+
+#include "SQLRowSubscriber.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+class MaxCollector: public SQLRowSubscriber {
+  void beginProcessRow() override {}
+
+  void endProcessRow() override {
+    if (columnsVerified_) {
+      return;
+    }
+
+    if (countColumns_ != mapState_.size())
+      throw minifi::Exception(PROCESSOR_EXCEPTION, "MaxCollector: Column(s) '" + maxValueColumnNames_ + "' are not found in the columns of '" + selectQuery_ + "' result.");
+
+    columnsVerified_ = true;
+  }
+
+  void processColumnName(const std::string& name) override {
+    if (columnsVerified_) {
+      return;
+    }
+
+    if (mapState_.count(name)) {
+      countColumns_++;
+    }
+  }
+
+  void processColumn(const std::string& name, const std::string& value)  override {
+    updateMaxValue(name, '\'' + value + '\'');
+  }
+
+  void processColumn(const std::string& name, double value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, int value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, long long value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, unsigned long long value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, const char* value) override {}
+
+  template <typename T>
+  struct MaxValue {
+    void updateMaxValue(const std::string& name, const T& value) {
+      const auto it = mapColumnNameValue_.find(name);
+      if (it == mapColumnNameValue_.end()) {
+        mapColumnNameValue_.insert({ name, value });
+      } else {
+        if (value > it->second) {
+          it->second = value;
+        }
+      }
+    }
+
+    std::unordered_map<std::string, T> mapColumnNameValue_;
+  };
+
+  template <typename T, typename Tuple, int Index, bool>
+  struct TupleIndexByType {
+    constexpr static int index() {
+      using tupleElType = typename std::decay<decltype(std::get<Index + 1>(Tuple()))>::type;
+
+      return TupleIndexByType<T, Tuple, Index + 1, std::is_same<tupleElType, MaxValue<T>>::value>::index();
+    }
+  };
+
+  template <typename T, typename Tuple, int Index>
+  struct TupleIndexByType<T, Tuple, Index, true> {
+    constexpr static int index() {
+      return Index;
+    }
+  };
+
+  template <typename Tuple, int Index>
+  struct UpdateMapState {
+    UpdateMapState(const Tuple& tpl, std::unordered_map<std::string, std::string>& mapState) {
+      for (auto& el : mapState) {
+        const auto& maxVal = std::get<Index>(tpl);
+
+        const auto it = maxVal.mapColumnNameValue_.find(el.first);
+        if (it != maxVal.mapColumnNameValue_.end()) {
+          std::stringstream ss;
+          ss << it->second;
+          el.second = ss.str();
+        }
+      }
+
+      UpdateMapState<Tuple, Index - 1>(tpl, mapState);
+    }
+  };
+
+  template <typename Tuple>
+  struct UpdateMapState<Tuple, -1> {
+    UpdateMapState(const Tuple&, std::unordered_map<std::string, std::string>&) {}
+  };
+
+  template <typename ...Ts>
+  struct MaxValues : public std::tuple<MaxValue<Ts>...> {
+    constexpr static size_t size = sizeof...(Ts);
+  };
+
+ public:
+  MaxCollector(const std::string& selectQuery, const std::string& maxValueColumnNames, std::unordered_map<std::string, std::string>& mapState)
+    :selectQuery_(selectQuery), maxValueColumnNames_(maxValueColumnNames), mapState_(mapState) {
+  }
+
+  template <typename T>
+  void updateMaxValue(const std::string& columnName, const T& value) {
+    if (mapState_.count(columnName)) {
+      constexpr auto index = TupleIndexByType<T, decltype(maxValues_), -1, false>::index();
+      std::get<index>(maxValues_).updateMaxValue(columnName, value);
+    }
+  }
+
+  bool updateMapState() {
+    auto mapState = mapState_;
+    UpdateMapState<decltype(maxValues_), decltype(maxValues_)::size - 1>(maxValues_, mapState_);
+
+    return mapState != mapState_;
 
 Review comment:
   The fact that we copy the whole map here to see if there were changes while make the code way more verbose to avoid additional lookups/writes in `MaxValue` is contradictory and feels wrong.
   
   Can we either optimize properly for performance if it matters or reduce verbosity if it doesn't?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377579639
 
 

 ##########
 File path: extensions/sql/data/MaxCollector.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <tuple>
+
+#include "SQLRowSubscriber.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+class MaxCollector: public SQLRowSubscriber {
+  void beginProcessRow() override {}
+
+  void endProcessRow() override {
+    if (columnsVerified_) {
+      return;
+    }
+
+    if (countColumns_ != mapState_.size())
+      throw minifi::Exception(PROCESSOR_EXCEPTION, "MaxCollector: Column(s) '" + maxValueColumnNames_ + "' are not found in the columns of '" + selectQuery_ + "' result.");
+
+    columnsVerified_ = true;
+  }
+
+  void processColumnName(const std::string& name) override {
+    if (columnsVerified_) {
+      return;
+    }
+
+    if (mapState_.count(name)) {
+      countColumns_++;
+    }
+  }
+
+  void processColumn(const std::string& name, const std::string& value)  override {
+    updateMaxValue(name, '\'' + value + '\'');
+  }
+
+  void processColumn(const std::string& name, double value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, int value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, long long value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, unsigned long long value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, const char* value) override {}
+
+  template <typename T>
+  struct MaxValue {
+    void updateMaxValue(const std::string& name, const T& value) {
+      const auto it = mapColumnNameValue_.find(name);
+      if (it == mapColumnNameValue_.end()) {
+        mapColumnNameValue_.insert({ name, value });
+      } else {
+        if (value > it->second) {
+          it->second = value;
+        }
+      }
+    }
+
+    std::unordered_map<std::string, T> mapColumnNameValue_;
+  };
+
+  template <typename T, typename Tuple, int Index, bool>
+  struct TupleIndexByType {
+    constexpr static int index() {
+      using tupleElType = typename std::decay<decltype(std::get<Index + 1>(Tuple()))>::type;
+
+      return TupleIndexByType<T, Tuple, Index + 1, std::is_same<tupleElType, MaxValue<T>>::value>::index();
+    }
+  };
+
+  template <typename T, typename Tuple, int Index>
+  struct TupleIndexByType<T, Tuple, Index, true> {
+    constexpr static int index() {
+      return Index;
+    }
+  };
 
 Review comment:
   `std::get(std::tuple)` supports indexing by type if the type is present in the tuple exactly once, but only since C++14. Can we/do we want to raise the requirements of this extension to avoid reimplementing this?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377572695
 
 

 ##########
 File path: extensions/sql/SQLLoader.h
 ##########
 @@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSION_SQLLOADER_H
+#define EXTENSION_SQLLOADER_H
+
+#include "core/ClassLoader.h"
+#include "processors/ExecuteSQL.h"
+#include "processors/PutSQL.h"
+#include "processors/QueryDatabaseTable.h"
+#include "services/ODBCConnector.h"
+
+class SQLFactory : public core::ObjectFactory {
+ public:
+  SQLFactory() {
+
+  }
+
+  /**
+   * Gets the name of the object.
+   * @return class name of processor
+   */
+  virtual std::string getName() override {
+    return "SQLFactory";
+  }
+
+  virtual std::string getClassName() override{
+    return "SQLFactory";
+  }
+  /**
+   * Gets the class name for the object
+   * @return class name for the processor.
+   */
+  virtual std::vector<std::string> getClassNames() override{
+    std::vector<std::string> class_names = {"ExecuteSQL", "PutSQL", "QueryDatabaseTable", "ODBCService"};
+    return class_names;
+  }
+
+  template <typename T>
+  static std::unique_ptr<ObjectFactory> getObjectFactory() {
+    return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<T>());
+  }
+
+  virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) override {
+    if (utils::StringUtils::equalsIgnoreCase(class_name, "ExecuteSQL")) {
+      return getObjectFactory<minifi::processors::ExecuteSQL>();
+    }
+    if (utils::StringUtils::equalsIgnoreCase(class_name, "PutSQL")) {
+      return getObjectFactory<minifi::processors::PutSQL>();
+    }
+    if (utils::StringUtils::equalsIgnoreCase(class_name, "QueryDatabaseTable")) {
+      return getObjectFactory<minifi::processors::QueryDatabaseTable>();
+    }
+    if (utils::StringUtils::equalsIgnoreCase(class_name, "ODBCService")) {
+      return getObjectFactory<minifi::sql::controllers::ODBCService>();
+    }
+
+    return nullptr;
+  }
+
+  static bool added;
 
 Review comment:
   I think it's a bad idea to expose a mutable variable on the public interface. If there's a good reason for it, please add a comment explaining that reason!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378603638
 
 

 ##########
 File path: extensions/sql/processors/QueryDatabaseTable.cpp
 ##########
 @@ -0,0 +1,475 @@
+/**
+ * @file QueryDatabaseTable.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueryDatabaseTable.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+#include <algorithm>
+#include <regex>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+#include "data/MaxCollector.h"
+#include "data/Utils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
+
+const core::Property QueryDatabaseTable::s_tableName(
+  core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_columnNames(
+  core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
+    "A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
+    "If no column names are supplied, all columns in the specified table will be returned. "
+    "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxValueColumnNames(
+  core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
+    "A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
+    "Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
+    "Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
+    "This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. "
+    "Note that some ODBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. "
+    "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
+    "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
+    "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
+    supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_whereClause(
+  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
+    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_sqlQuery(
+  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
+    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
+    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_stateDirectory(
+  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+
+const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+
+const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultTableName = "tablename";
+static const std::string ResultRowCount = "querydbtable.row.count";
+
+// State
+class State {
+ public:
+  State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+    :tableName_(tableName), logger_(logger) {
+    if (!createUUIDDir(stateDir, uuid, filePath_))
+      return;
+
+    filePath_ += "State.txt";
+
+    if (!getStateFromFile())
+      return;
+
+    ok_ = true;
+  }
+
+  ~State() {
+    if (file_.is_open()) {
+      file_.close();
+    }
+  }
+
+  operator bool() const {
+    return ok_;
+  }
+
+  std::unordered_map<std::string, std::string> mapState() const {
+    return mapState_;
+  }
+
+  void writeStateToFile(const std::unordered_map<std::string, std::string>& mapState) {
+    file_.seekp(std::ios::beg);
+
+    file_ << tableName_ << separator();
+    auto dataSize = tableName_.size() + separator().size();
+
+    for (const auto& el : mapState) {
+      file_ << el.first << '=' << el.second << separator();
+      dataSize += el.first.size() + 1 + el.second.size() + separator().size();
+    }
+
+    // If dataSize_ > dataSize, then clear difference with ' '.
+    if (dataSize_ > dataSize) {
+      for (auto i = dataSize_ - dataSize; i > 0; i--) {
+        file_ << ' ';
+      }
+    }
 
 Review comment:
   `if` cannot be removed because of `dataSize_` and `dataSize` have unsigned types and if `dataSize_` < `dataSize`, `i` will be very large number.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377666920
 
 

 ##########
 File path: extensions/sql/services/DatabaseService.h
 ##########
 @@ -0,0 +1,116 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_DATABASESERVICE_H_
+#define LIBMINIFI_INCLUDE_CONTROLLERS_DATABASESERVICE_H_
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+#include "data/DatabaseConnectors.h"
+#include <memory>
+#include <unordered_map>
+
+#include <soci/soci.h>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+namespace controllers {
+
+/**
+ * Purpose and Justification: Controller services function as a layerable way to provide
+ * services to internal services. While a controller service is generally configured from the flow,
+ * we want to follow the open closed principle and provide Database services
+ */
+class DatabaseService : public core::controller::ControllerService {
+ public:
+
+  /**
+   * Constructors for the controller service.
+   */
+  explicit DatabaseService(const std::string &name, const std::string &id)
+      : ControllerService(name, id),
+        initialized_(false),
+        logger_(logging::LoggerFactory<DatabaseService>::getLogger()) {
+    initialize();
+  }
+
+  explicit DatabaseService(const std::string &name, utils::Identifier uuid = utils::Identifier())
+      : ControllerService(name, uuid),
+        initialized_(false),
+        logger_(logging::LoggerFactory<DatabaseService>::getLogger()) {
+    initialize();
+  }
+
+  explicit DatabaseService(const std::string &name, const std::shared_ptr<Configure> &configuration)
+      : ControllerService(name),
+        initialized_(false),
+        logger_(logging::LoggerFactory<DatabaseService>::getLogger()) {
+    setConfiguration(configuration);
+    initialize();
+  }
+
+  /**
+   * Parameters needed.
+   */
+  static core::Property ConnectionString;
+
+  virtual void initialize() override;
+
+  void yield() override {
+
+  }
+
+  bool isRunning() override {
+    return getState() == core::controller::ControllerServiceState::ENABLED;
+  }
+
+  bool isWorkAvailable() override {
+    return false;
+  }
+
+  virtual void onEnable() override;
+
+  virtual std::unique_ptr<sql::Connection> getConnection() const = 0;
+
+ protected:
+
+  void initializeProperties();
+
+  // initialization mutex.
+  std::recursive_mutex initialization_mutex_;
 
 Review comment:
   I also think that recursive mutex is a bad smell, but the usage looks correct here, so let's just create a follow-up for this. 
   Created https://issues.apache.org/jira/browse/MINIFICPP-1156

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r379325208
 
 

 ##########
 File path: extensions/sql/data/MaxCollector.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <tuple>
+
+#include "SQLRowSubscriber.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+class MaxCollector: public SQLRowSubscriber {
+  void beginProcessRow() override {}
+
+  void endProcessRow() override {
+    if (columnsVerified_) {
+      return;
+    }
+
+    if (countColumns_ != mapState_.size())
+      throw minifi::Exception(PROCESSOR_EXCEPTION, "MaxCollector: Column(s) '" + maxValueColumnNames_ + "' are not found in the columns of '" + selectQuery_ + "' result.");
+
+    columnsVerified_ = true;
+  }
+
+  void processColumnName(const std::string& name) override {
+    if (columnsVerified_) {
+      return;
+    }
+
+    if (mapState_.count(name)) {
+      countColumns_++;
+    }
+  }
+
+  void processColumn(const std::string& name, const std::string& value)  override {
+    updateMaxValue(name, '\'' + value + '\'');
+  }
+
+  void processColumn(const std::string& name, double value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, int value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, long long value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, unsigned long long value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, const char* value) override {}
+
+  template <typename T>
+  struct MaxValue {
+    void updateMaxValue(const std::string& name, const T& value) {
+      const auto it = mapColumnNameValue_.find(name);
+      if (it == mapColumnNameValue_.end()) {
+        mapColumnNameValue_.insert({ name, value });
+      } else {
+        if (value > it->second) {
+          it->second = value;
+        }
+      }
+    }
+
+    std::unordered_map<std::string, T> mapColumnNameValue_;
+  };
+
+  template <typename Tuple, int Index>
+  struct UpdateMapState {
+    UpdateMapState(const Tuple& tpl, std::unordered_map<std::string, std::string>& mapState) {
+      for (auto& el : mapState) {
+        const auto& maxVal = std::get<Index>(tpl);
+
+        const auto it = maxVal.mapColumnNameValue_.find(el.first);
+        if (it != maxVal.mapColumnNameValue_.end()) {
+          std::stringstream ss;
+          ss << it->second;
+          el.second = ss.str();
+        }
+      }
+
+      UpdateMapState<Tuple, Index - 1>(tpl, mapState);
 
 Review comment:
   Build fails on macOS with:
   ```
   /Users/danielbakai/nifi-minifi-cpp/extensions/sql/./data/MaxCollector.h:109:40: error: cannot pass object
         of non-trivial type 'const
         org::apache::nifi::minifi::sql::MaxCollector::MaxValues<std::__1::basic_string<char>, double, int,
         long long, unsigned long long>' through variadic constructor; call will abort at runtime
         [-Wnon-pod-varargs]
         UpdateMapState<Tuple, Index - 1>(tpl, mapState);
                                          ^
   ```
   
   And on Linux with clang with:
   ```
   In file included from /home/bakaid/nifi-minifi-cpp/extensions/sql/processors/QueryDatabaseTable.cpp:47:
   /home/bakaid/nifi-minifi-cpp/extensions/sql/./data/MaxCollector.h:109:40: error: cannot pass object of non-trivial type 'const
         org::apache::nifi::minifi::sql::MaxCollector::MaxValues<std::__cxx11::basic_string<char>, double, int, long long, unsigned long long>' through variadic constructor; call will
         abort at runtime [-Wnon-pod-varargs]
         UpdateMapState<Tuple, Index - 1>(tpl, mapState);
                                          ^
   /home/bakaid/nifi-minifi-cpp/extensions/sql/./data/MaxCollector.h:109:7: note: in instantiation of member function
         'org::apache::nifi::minifi::sql::MaxCollector::UpdateMapState<org::apache::nifi::minifi::sql::MaxCollector::MaxValues<std::__cxx11::basic_string<char>, double, int, long long,
         unsigned long long>, 0>::UpdateMapState' requested here
         UpdateMapState<Tuple, Index - 1>(tpl, mapState);
         ^
   /home/bakaid/nifi-minifi-cpp/extensions/sql/./data/MaxCollector.h:109:7: note: in instantiation of member function
         'org::apache::nifi::minifi::sql::MaxCollector::UpdateMapState<org::apache::nifi::minifi::sql::MaxCollector::MaxValues<std::__cxx11::basic_string<char>, double, int, long long,
         unsigned long long>, 1>::UpdateMapState' requested here
   /home/bakaid/nifi-minifi-cpp/extensions/sql/./data/MaxCollector.h:109:7: note: in instantiation of member function
         'org::apache::nifi::minifi::sql::MaxCollector::UpdateMapState<org::apache::nifi::minifi::sql::MaxCollector::MaxValues<std::__cxx11::basic_string<char>, double, int, long long,
         unsigned long long>, 2>::UpdateMapState' requested here
   /home/bakaid/nifi-minifi-cpp/extensions/sql/./data/MaxCollector.h:109:7: note: in instantiation of member function
         'org::apache::nifi::minifi::sql::MaxCollector::UpdateMapState<org::apache::nifi::minifi::sql::MaxCollector::MaxValues<std::__cxx11::basic_string<char>, double, int, long long,
         unsigned long long>, 3>::UpdateMapState' requested here
   /home/bakaid/nifi-minifi-cpp/extensions/sql/./data/MaxCollector.h:137:5: note: in instantiation of member function
         'org::apache::nifi::minifi::sql::MaxCollector::UpdateMapState<org::apache::nifi::minifi::sql::MaxCollector::MaxValues<std::__cxx11::basic_string<char>, double, int, long long,
         unsigned long long>, 4>::UpdateMapState' requested here
       UpdateMapState<decltype(maxValues_), decltype(maxValues_)::size - 1>(maxValues_, mapState_);
       ^
   /home/bakaid/nifi-minifi-cpp/extensions/sql/./data/MaxCollector.h:109:45: error: cannot pass object of non-trivial type 'std::unordered_map<std::string, std::string>' (aka
         'unordered_map<basic_string<char>, basic_string<char> >') through variadic constructor; call will abort at runtime [-Wnon-pod-varargs]
         UpdateMapState<Tuple, Index - 1>(tpl, mapState);
                                               ^
   2 errors generated.
   extensions/sql/CMakeFiles/minifi-sql.dir/build.make:206: recipe for target 'extensions/sql/CMakeFiles/minifi-sql.dir/processors/QueryDatabaseTable.cpp.o' failed
   make[2]: *** [extensions/sql/CMakeFiles/minifi-sql.dir/processors/QueryDatabaseTable.cpp.o] Error 1
   CMakeFiles/Makefile2:3863: recipe for target 'extensions/sql/CMakeFiles/minifi-sql.dir/all' failed
   ```
   
   The changes introduced are not portable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377696233
 
 

 ##########
 File path: extensions/sql/data/Utils.cpp
 ##########
 @@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Utils.h"
+
+#include <algorithm>
+#include  <cctype>
+#include  <regex>
+#include  <sstream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string toLower(const std::string& str) {
+  std::string ret;
+
+  // (int(*)(int))std::tolower - to avoid compilation error 'no matching overloaded function found'. 
+  // It is described in https://stackoverflow.com/questions/5539249/why-cant-transforms-begin-s-end-s-begin-tolower-be-complied-successfu.
+  std::transform(str.begin(), str.end(), std::back_inserter(ret), (int(*)(int))std::tolower);
 
 Review comment:
   Given the IO limit we surely have in case of an SQL processor I think saving an allocation doesn't really mean a lot. 
   Don't get me wrong, you are correct, but it wouldn't change anything. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378128633
 
 

 ##########
 File path: extensions/sql/SQLLoader.h
 ##########
 @@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSION_SQLLOADER_H
+#define EXTENSION_SQLLOADER_H
+
+#include "core/ClassLoader.h"
+#include "processors/ExecuteSQL.h"
+#include "processors/PutSQL.h"
+#include "processors/QueryDatabaseTable.h"
+#include "services/ODBCConnector.h"
+
+class SQLFactory : public core::ObjectFactory {
+ public:
+  SQLFactory() {
+
+  }
+
+  /**
+   * Gets the name of the object.
+   * @return class name of processor
+   */
+  virtual std::string getName() override {
+    return "SQLFactory";
+  }
+
+  virtual std::string getClassName() override{
+    return "SQLFactory";
+  }
+  /**
+   * Gets the class name for the object
+   * @return class name for the processor.
+   */
+  virtual std::vector<std::string> getClassNames() override{
+    std::vector<std::string> class_names = {"ExecuteSQL", "PutSQL", "QueryDatabaseTable", "ODBCService"};
+    return class_names;
+  }
+
+  template <typename T>
+  static std::unique_ptr<ObjectFactory> getObjectFactory() {
+    return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<T>());
+  }
+
+  virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) override {
+    if (utils::StringUtils::equalsIgnoreCase(class_name, "ExecuteSQL")) {
+      return getObjectFactory<minifi::processors::ExecuteSQL>();
+    }
+    if (utils::StringUtils::equalsIgnoreCase(class_name, "PutSQL")) {
+      return getObjectFactory<minifi::processors::PutSQL>();
+    }
+    if (utils::StringUtils::equalsIgnoreCase(class_name, "QueryDatabaseTable")) {
+      return getObjectFactory<minifi::processors::QueryDatabaseTable>();
+    }
+    if (utils::StringUtils::equalsIgnoreCase(class_name, "ODBCService")) {
+      return getObjectFactory<minifi::sql::controllers::ODBCService>();
+    }
+
+    return nullptr;
+  }
+
+  static bool added;
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377598410
 
 

 ##########
 File path: extensions/sql/services/ODBCConnector.h
 ##########
 @@ -0,0 +1,127 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once 
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+
+#include "DatabaseService.h"
+#include "core/Resource.h"
+#include "data/DatabaseConnectors.h"
+#include <memory>
+#include <unordered_map>
+
+#include <soci/soci.h>
+#include <soci/odbc/soci-odbc.h>
+
+#include <iostream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+namespace controllers {
+
+class ODBCConnection : public sql::Connection {
+ public:
+  explicit ODBCConnection(const std::string& connectionString)
+    : connection_string_(connectionString) {
+      session_ = std::make_unique<soci::session>(getSessionParameters());
+  }
+
+  virtual ~ODBCConnection() {
+  }
+
+  bool connected(std::string& exception) const override {
+    try {
+      exception.clear();
+      // According to https://stackoverflow.com/questions/3668506/efficient-sql-test-query-or-validation-query-that-will-work-across-all-or-most by Rob Hruska, 
+      // 'select 1' works for: H2, MySQL, Microsoft SQL Server, PostgreSQL, SQLite. For Orcale 'SELECT 1 FROM DUAL' works.
 
 Review comment:
   typo: s/Orcale/Oracle/

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377602791
 
 

 ##########
 File path: extensions/sql/SQLLoader.h
 ##########
 @@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSION_SQLLOADER_H
+#define EXTENSION_SQLLOADER_H
+
+#include "core/ClassLoader.h"
+#include "processors/ExecuteSQL.h"
+#include "processors/PutSQL.h"
+#include "processors/QueryDatabaseTable.h"
+#include "services/ODBCConnector.h"
+
+class SQLFactory : public core::ObjectFactory {
+ public:
+  SQLFactory() {
+
+  }
+
+  /**
+   * Gets the name of the object.
+   * @return class name of processor
+   */
+  virtual std::string getName() override {
+    return "SQLFactory";
+  }
+
+  virtual std::string getClassName() override{
+    return "SQLFactory";
+  }
+  /**
+   * Gets the class name for the object
+   * @return class name for the processor.
+   */
+  virtual std::vector<std::string> getClassNames() override{
+    std::vector<std::string> class_names = {"ExecuteSQL", "PutSQL", "QueryDatabaseTable", "ODBCService"};
+    return class_names;
+  }
+
+  template <typename T>
+  static std::unique_ptr<ObjectFactory> getObjectFactory() {
+    return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<T>());
 
 Review comment:
   `unique_ptr` is implicitly convertible from a `unique_ptr` to a compatible type, so you can use `make_unique` if you want to avoid spelling the type name one more time. This is again a question of taste, so feel free to ignore/resolve this.
   
   If you want to keep this extension C++11-compatible, I've made a `make_unique` impl in GeneralUtils.h.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377663445
 
 

 ##########
 File path: win_build_vs.bat
 ##########
 @@ -22,10 +22,12 @@ if [%1]==[] goto usage
 set builddir=%1
 set skiptests=OFF
 set cmake_build_type=Release
+set build_type=Release
 
 Review comment:
   Yep, this looks like a bad merge and breaks the `/D` option.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378188245
 
 

 ##########
 File path: extensions/sql/data/SQLRowsetProcessor.h
 ##########
 @@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include <soci/soci.h>
+
+#include "SQLRowSubscriber.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+class SQLRowsetProcessor
+{
+ public:
+  SQLRowsetProcessor(const soci::rowset<soci::row>& rowset, const std::vector<SQLRowSubscriber*>& rowSubscribers);
+
+  size_t process(size_t max = 0);
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r379072540
 
 

 ##########
 File path: extensions/sql/data/WriteCallback.h
 ##########
 @@ -0,0 +1,49 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class WriteCallback : public OutputStreamCallback {
+public:
+  WriteCallback(const std::string& data)
 
 Review comment:
   I've misunderstood the purpose of the class. Up until now I thought that `stream->write` writes into `data_`, therefore modifies it. I only now realized that it copies _from_ `data_` _to_ the stream, not the other way around, and that the incorrect interface here is `BaseStream::write`, not `std::string::data`.
   
   Sorry for my misunderstanding. Thanks for fixing the ctor, this thread can be closed now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378128964
 
 

 ##########
 File path: extensions/sql/data/DatabaseConnectors.h
 ##########
 @@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef EXTENSIONS_SQL_SERVICES_DATABASECONNECTORS_H_
+#define EXTENSIONS_SQL_SERVICES_DATABASECONNECTORS_H_
+
+#include <memory>
+#include <iostream>
+#include <algorithm>
+#include <cctype>
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378188342
 
 

 ##########
 File path: extensions/sql/processors/QueryDatabaseTable.cpp
 ##########
 @@ -0,0 +1,475 @@
+/**
+ * @file QueryDatabaseTable.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueryDatabaseTable.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+#include <algorithm>
+#include <regex>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+#include "data/MaxCollector.h"
+#include "data/Utils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
+
+const core::Property QueryDatabaseTable::s_tableName(
+  core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_columnNames(
+  core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
+    "A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
+    "If no column names are supplied, all columns in the specified table will be returned. "
+    "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxValueColumnNames(
+  core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
+    "A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
+    "Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
+    "Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
+    "This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. "
+    "Note that some ODBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. "
+    "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
+    "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
+    "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
+    supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_whereClause(
+  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
+    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_sqlQuery(
+  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
+    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
+    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_stateDirectory(
+  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+
+const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+
+const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultTableName = "tablename";
+static const std::string ResultRowCount = "querydbtable.row.count";
+
+// State
+class State {
+ public:
+  State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+    :tableName_(tableName), logger_(logger) {
+    if (!createUUIDDir(stateDir, uuid, filePath_))
+      return;
+
+    filePath_ += "State.txt";
+
+    if (!getStateFromFile())
+      return;
+
+    ok_ = true;
+  }
+
+  ~State() {
+    if (file_.is_open()) {
+      file_.close();
+    }
+  }
+
+  operator bool() const {
+    return ok_;
+  }
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378742486
 
 

 ##########
 File path: extensions/sql/processors/QueryDatabaseTable.cpp
 ##########
 @@ -0,0 +1,475 @@
+/**
+ * @file QueryDatabaseTable.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueryDatabaseTable.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+#include <algorithm>
+#include <regex>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+#include "data/MaxCollector.h"
+#include "data/Utils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
+
+const core::Property QueryDatabaseTable::s_tableName(
+  core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_columnNames(
+  core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
+    "A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
+    "If no column names are supplied, all columns in the specified table will be returned. "
+    "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxValueColumnNames(
+  core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
+    "A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
+    "Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
+    "Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
+    "This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. "
+    "Note that some ODBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. "
+    "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
+    "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
+    "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
+    supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_whereClause(
+  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
+    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_sqlQuery(
+  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
+    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
+    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_stateDirectory(
+  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+
+const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+
+const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultTableName = "tablename";
+static const std::string ResultRowCount = "querydbtable.row.count";
+
+// State
+class State {
+ public:
+  State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+    :tableName_(tableName), logger_(logger) {
+    if (!createUUIDDir(stateDir, uuid, filePath_))
+      return;
+
+    filePath_ += "State.txt";
+
+    if (!getStateFromFile())
+      return;
+
+    ok_ = true;
+  }
+
+  ~State() {
+    if (file_.is_open()) {
+      file_.close();
+    }
+  }
+
+  operator bool() const {
+    return ok_;
+  }
+
+  std::unordered_map<std::string, std::string> mapState() const {
+    return mapState_;
+  }
+
+  void writeStateToFile(const std::unordered_map<std::string, std::string>& mapState) {
+    file_.seekp(std::ios::beg);
+
+    file_ << tableName_ << separator();
+    auto dataSize = tableName_.size() + separator().size();
+
+    for (const auto& el : mapState) {
+      file_ << el.first << '=' << el.second << separator();
+      dataSize += el.first.size() + 1 + el.second.size() + separator().size();
+    }
+
+    // If dataSize_ > dataSize, then clear difference with ' '.
+    if (dataSize_ > dataSize) {
+      for (auto i = dataSize_ - dataSize; i > 0; i--) {
+        file_ << ' ';
+      }
+    }
 
 Review comment:
   Added comments in code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378783622
 
 

 ##########
 File path: extensions/sql/processors/QueryDatabaseTable.cpp
 ##########
 @@ -0,0 +1,475 @@
+/**
+ * @file QueryDatabaseTable.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueryDatabaseTable.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+#include <algorithm>
+#include <regex>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+#include "data/MaxCollector.h"
+#include "data/Utils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
+
+const core::Property QueryDatabaseTable::s_tableName(
+  core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_columnNames(
+  core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
+    "A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
+    "If no column names are supplied, all columns in the specified table will be returned. "
+    "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxValueColumnNames(
+  core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
+    "A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
+    "Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
+    "Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
+    "This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. "
+    "Note that some ODBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. "
+    "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
+    "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
+    "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
+    supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_whereClause(
+  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
+    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_sqlQuery(
+  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
+    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
+    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_stateDirectory(
+  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+
+const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+
+const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultTableName = "tablename";
+static const std::string ResultRowCount = "querydbtable.row.count";
+
+// State
+class State {
+ public:
+  State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+    :tableName_(tableName), logger_(logger) {
+    if (!createUUIDDir(stateDir, uuid, filePath_))
+      return;
+
+    filePath_ += "State.txt";
+
+    if (!getStateFromFile())
+      return;
+
+    ok_ = true;
+  }
+
+  ~State() {
+    if (file_.is_open()) {
+      file_.close();
+    }
+  }
+
+  operator bool() const {
+    return ok_;
+  }
+
+  std::unordered_map<std::string, std::string> mapState() const {
+    return mapState_;
+  }
+
+  void writeStateToFile(const std::unordered_map<std::string, std::string>& mapState) {
+    file_.seekp(std::ios::beg);
+
+    file_ << tableName_ << separator();
+    auto dataSize = tableName_.size() + separator().size();
+
+    for (const auto& el : mapState) {
+      file_ << el.first << '=' << el.second << separator();
+      dataSize += el.first.size() + 1 + el.second.size() + separator().size();
+    }
+
+    // If dataSize_ > dataSize, then clear difference with ' '.
+    if (dataSize_ > dataSize) {
+      for (auto i = dataSize_ - dataSize; i > 0; i--) {
+        file_ << ' ';
+      }
+    }
 
 Review comment:
   Ack, thanks for the explanation

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378740016
 
 

 ##########
 File path: extensions/sql/data/WriteCallback.h
 ##########
 @@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class WriteCallback : public OutputStreamCallback {
+public:
+  WriteCallback(const char *data, uint64_t size)
+    : _data(const_cast<char*>(data)),
 
 Review comment:
   WriteCallback called like WriteCallback(out.data(), ...); out is string out.data() type is const char*.
   With WriteCallback(char*, ...) , WriteCallback(out.data(), ...) won't compile because cannot convert from const char* -> char*.
   
   But I'll refactor to WriteCallback(const std::string&). 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378188116
 
 

 ##########
 File path: extensions/sql/data/MaxCollector.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <tuple>
+
+#include "SQLRowSubscriber.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+class MaxCollector: public SQLRowSubscriber {
+  void beginProcessRow() override {}
+
+  void endProcessRow() override {
+    if (columnsVerified_) {
+      return;
+    }
+
+    if (countColumns_ != mapState_.size())
+      throw minifi::Exception(PROCESSOR_EXCEPTION, "MaxCollector: Column(s) '" + maxValueColumnNames_ + "' are not found in the columns of '" + selectQuery_ + "' result.");
+
+    columnsVerified_ = true;
+  }
+
+  void processColumnName(const std::string& name) override {
+    if (columnsVerified_) {
+      return;
+    }
+
+    if (mapState_.count(name)) {
+      countColumns_++;
+    }
+  }
+
+  void processColumn(const std::string& name, const std::string& value)  override {
+    updateMaxValue(name, '\'' + value + '\'');
+  }
+
+  void processColumn(const std::string& name, double value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, int value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, long long value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, unsigned long long value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, const char* value) override {}
+
+  template <typename T>
+  struct MaxValue {
+    void updateMaxValue(const std::string& name, const T& value) {
+      const auto it = mapColumnNameValue_.find(name);
+      if (it == mapColumnNameValue_.end()) {
+        mapColumnNameValue_.insert({ name, value });
+      } else {
+        if (value > it->second) {
+          it->second = value;
+        }
+      }
+    }
+
+    std::unordered_map<std::string, T> mapColumnNameValue_;
+  };
+
+  template <typename T, typename Tuple, int Index, bool>
+  struct TupleIndexByType {
+    constexpr static int index() {
+      using tupleElType = typename std::decay<decltype(std::get<Index + 1>(Tuple()))>::type;
+
+      return TupleIndexByType<T, Tuple, Index + 1, std::is_same<tupleElType, MaxValue<T>>::value>::index();
+    }
+  };
+
+  template <typename T, typename Tuple, int Index>
+  struct TupleIndexByType<T, Tuple, Index, true> {
+    constexpr static int index() {
+      return Index;
+    }
+  };
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378133280
 
 

 ##########
 File path: win_build_vs.bat
 ##########
 @@ -22,10 +22,12 @@ if [%1]==[] goto usage
 set builddir=%1
 set skiptests=OFF
 set cmake_build_type=Release
+set build_type=Release
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r377635439
 
 

 ##########
 File path: extensions/sql/services/ODBCConnector.h
 ##########
 @@ -0,0 +1,127 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once 
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+
+#include "DatabaseService.h"
+#include "core/Resource.h"
+#include "data/DatabaseConnectors.h"
+#include <memory>
+#include <unordered_map>
+
+#include <soci/soci.h>
+#include <soci/odbc/soci-odbc.h>
+
+#include <iostream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+namespace controllers {
+
+class ODBCConnection : public sql::Connection {
+ public:
+  explicit ODBCConnection(const std::string& connectionString)
+    : connection_string_(connectionString) {
+      session_ = std::make_unique<soci::session>(getSessionParameters());
+  }
+
+  virtual ~ODBCConnection() {
+  }
+
+  bool connected(std::string& exception) const override {
+    try {
+      exception.clear();
+      // According to https://stackoverflow.com/questions/3668506/efficient-sql-test-query-or-validation-query-that-will-work-across-all-or-most by Rob Hruska, 
+      // 'select 1' works for: H2, MySQL, Microsoft SQL Server, PostgreSQL, SQLite. For Orcale 'SELECT 1 FROM DUAL' works.
+      prepareStatement("select 1")->execute();
+      return true;
+    } catch (std::exception& e) {
+      exception = e.what();
+      return false;
+    }
+  }
+
+  std::unique_ptr<sql::Statement> prepareStatement(const std::string& query) const override {
+    return std::make_unique<sql::Statement>(session_, query);
+  }
+
+  std::unique_ptr<Session> getSession() const override {
+    return std::make_unique<sql::Session>(session_);
+  }
+
+ private:
+   const soci::connection_parameters getSessionParameters() const {
 
 Review comment:
   Top-level `const` in function return types is redundant and misleading. A function return value will not behave as `const` since it behaves as if it was copied/moved to the caller, therefore the `const` qualifier of the destination takes precedence and the qualifier on the function return type is ignored.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378131050
 
 

 ##########
 File path: extensions/sql/processors/QueryDatabaseTable.cpp
 ##########
 @@ -0,0 +1,475 @@
+/**
+ * @file QueryDatabaseTable.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueryDatabaseTable.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+#include <algorithm>
+#include <regex>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+#include "data/MaxCollector.h"
+#include "data/Utils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
+
+const core::Property QueryDatabaseTable::s_tableName(
+  core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_columnNames(
+  core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
+    "A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
+    "If no column names are supplied, all columns in the specified table will be returned. "
+    "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxValueColumnNames(
+  core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
+    "A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
+    "Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
+    "Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
+    "This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. "
+    "Note that some ODBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. "
+    "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
+    "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
+    "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
+    supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_whereClause(
+  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
+    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_sqlQuery(
+  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
+    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
+    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_stateDirectory(
+  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+
+const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+
+const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultTableName = "tablename";
+static const std::string ResultRowCount = "querydbtable.row.count";
+
+// State
+class State {
+ public:
+  State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+    :tableName_(tableName), logger_(logger) {
+    if (!createUUIDDir(stateDir, uuid, filePath_))
+      return;
+
+    filePath_ += "State.txt";
+
+    if (!getStateFromFile())
+      return;
+
+    ok_ = true;
+  }
+
+  ~State() {
+    if (file_.is_open()) {
+      file_.close();
+    }
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r379347693
 
 

 ##########
 File path: extensions/sql/processors/ExecuteSQL.cpp
 ##########
 @@ -0,0 +1,123 @@
+/**
+ * @file ExecuteSQL.cpp
+ * ExecuteSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ExecuteSQL.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string ExecuteSQL::ProcessorName("ExecuteSQL");
+
+const core::Property ExecuteSQL::s_sqlSelectQuery(
+  core::PropertyBuilder::createProperty("SQL select query")->isRequired(true)->withDescription(
+    "The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. "
+    "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
+    "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database. "
+    "Note that Expression Language is not evaluated for flow file contents.")->supportsExpressionLanguage(true)->build());
+
+const core::Property ExecuteSQL::s_maxRowsPerFlowFile(
+	core::PropertyBuilder::createProperty("Max Rows Per Flow File")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+		"The maximum number of result rows that will be included intoi a flow file. If zero then all will be placed into the flow file")->supportsExpressionLanguage(true)->build());
+
+const core::Relationship ExecuteSQL::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultRowCount = "executesql.row.count";
+
+ExecuteSQL::ExecuteSQL(const std::string& name, utils::Identifier uuid)
+  : SQLProcessor(name, uuid), max_rows_(0) {
+}
+
+ExecuteSQL::~ExecuteSQL() {
+}
+
+void ExecuteSQL::initialize() {
+  //! Set the supported properties
+  setSupportedProperties( { dbControllerService(), outputFormat(), s_sqlSelectQuery, s_maxRowsPerFlowFile});
+
+  //! Set the supported relationships
+  setSupportedRelationships( { s_success });
+}
+
+void ExecuteSQL::processOnSchedule(const core::ProcessContext &context) {
+  initOutputFormat(context);
+
+  context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
+  context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
+}
+
+void ExecuteSQL::processOnTrigger(core::ProcessSession &session) {
+  auto statement = connection_->prepareStatement(sqlSelectQuery_);
+
+  auto rowset = statement->execute();
+
+  int count = 0;
+  size_t rowCount = 0;
+  sql::JSONSQLWriter sqlWriter(isJSONPretty());
+  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, { &sqlWriter });
+
+  // Process rowset.
+  do {
+    rowCount = sqlRowsetProcessor.process(max_rows_ == 0 ? std::numeric_limits<size_t>::max() : max_rows_);
+    count++;
+    if (rowCount == 0)
+      break;
+
+    const auto& output = sqlWriter.toString();
 
 Review comment:
   In this case we know pretty well that `toString` returns a std::string and that it has a move constructor, not to mention in this case RVO would kick in.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378603638
 
 

 ##########
 File path: extensions/sql/processors/QueryDatabaseTable.cpp
 ##########
 @@ -0,0 +1,475 @@
+/**
+ * @file QueryDatabaseTable.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueryDatabaseTable.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+#include <algorithm>
+#include <regex>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+#include "data/MaxCollector.h"
+#include "data/Utils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
+
+const core::Property QueryDatabaseTable::s_tableName(
+  core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_columnNames(
+  core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
+    "A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
+    "If no column names are supplied, all columns in the specified table will be returned. "
+    "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxValueColumnNames(
+  core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
+    "A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
+    "Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
+    "Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
+    "This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. "
+    "Note that some ODBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. "
+    "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
+    "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
+    "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
+    supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_whereClause(
+  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
+    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_sqlQuery(
+  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
+    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
+    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_stateDirectory(
+  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+
+const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+
+const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultTableName = "tablename";
+static const std::string ResultRowCount = "querydbtable.row.count";
+
+// State
+class State {
+ public:
+  State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+    :tableName_(tableName), logger_(logger) {
+    if (!createUUIDDir(stateDir, uuid, filePath_))
+      return;
+
+    filePath_ += "State.txt";
+
+    if (!getStateFromFile())
+      return;
+
+    ok_ = true;
+  }
+
+  ~State() {
+    if (file_.is_open()) {
+      file_.close();
+    }
+  }
+
+  operator bool() const {
+    return ok_;
+  }
+
+  std::unordered_map<std::string, std::string> mapState() const {
+    return mapState_;
+  }
+
+  void writeStateToFile(const std::unordered_map<std::string, std::string>& mapState) {
+    file_.seekp(std::ios::beg);
+
+    file_ << tableName_ << separator();
+    auto dataSize = tableName_.size() + separator().size();
+
+    for (const auto& el : mapState) {
+      file_ << el.first << '=' << el.second << separator();
+      dataSize += el.first.size() + 1 + el.second.size() + separator().size();
+    }
+
+    // If dataSize_ > dataSize, then clear difference with ' '.
+    if (dataSize_ > dataSize) {
+      for (auto i = dataSize_ - dataSize; i > 0; i--) {
+        file_ << ' ';
+      }
+    }
 
 Review comment:
   `if` cannot be removed because of `dataSize_` and `dataSize` have unsigned types and if `dataSize_` < `dataSize`, `i` will be a very large positive number.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378128742
 
 

 ##########
 File path: extensions/sql/SQLLoader.h
 ##########
 @@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSION_SQLLOADER_H
+#define EXTENSION_SQLLOADER_H
+
+#include "core/ClassLoader.h"
+#include "processors/ExecuteSQL.h"
+#include "processors/PutSQL.h"
+#include "processors/QueryDatabaseTable.h"
+#include "services/ODBCConnector.h"
+
+class SQLFactory : public core::ObjectFactory {
+ public:
+  SQLFactory() {
+
+  }
+
+  /**
+   * Gets the name of the object.
+   * @return class name of processor
+   */
+  virtual std::string getName() override {
+    return "SQLFactory";
+  }
+
+  virtual std::string getClassName() override{
+    return "SQLFactory";
+  }
+  /**
+   * Gets the class name for the object
+   * @return class name for the processor.
+   */
+  virtual std::vector<std::string> getClassNames() override{
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on issue #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
bakaid commented on issue #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#issuecomment-586195247
 
 
   @am-c-p-p build succeeds now, going to start testing the new version.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r378203621
 
 

 ##########
 File path: extensions/sql/processors/QueryDatabaseTable.cpp
 ##########
 @@ -0,0 +1,475 @@
+/**
+ * @file QueryDatabaseTable.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueryDatabaseTable.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+#include <algorithm>
+#include <regex>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+#include "data/MaxCollector.h"
+#include "data/Utils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
+
+const core::Property QueryDatabaseTable::s_tableName(
+  core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_columnNames(
+  core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
+    "A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
+    "If no column names are supplied, all columns in the specified table will be returned. "
+    "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxValueColumnNames(
+  core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
+    "A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
+    "Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
+    "Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
+    "This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. "
+    "Note that some ODBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. "
+    "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
+    "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
+    "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
+    supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_whereClause(
+  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
+    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_sqlQuery(
+  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
+    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
+    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_stateDirectory(
+  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+
+const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+
+const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultTableName = "tablename";
+static const std::string ResultRowCount = "querydbtable.row.count";
+
+// State
+class State {
+ public:
+  State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+    :tableName_(tableName), logger_(logger) {
+    if (!createUUIDDir(stateDir, uuid, filePath_))
+      return;
+
+    filePath_ += "State.txt";
+
+    if (!getStateFromFile())
+      return;
+
+    ok_ = true;
+  }
+
+  ~State() {
+    if (file_.is_open()) {
+      file_.close();
+    }
+  }
+
+  operator bool() const {
+    return ok_;
+  }
+
+  std::unordered_map<std::string, std::string> mapState() const {
+    return mapState_;
+  }
+
+  void writeStateToFile(const std::unordered_map<std::string, std::string>& mapState) {
+    file_.seekp(std::ios::beg);
+
+    file_ << tableName_ << separator();
+    auto dataSize = tableName_.size() + separator().size();
+
+    for (const auto& el : mapState) {
+      file_ << el.first << '=' << el.second << separator();
+      dataSize += el.first.size() + 1 + el.second.size() + separator().size();
+    }
+
+    // If dataSize_ > dataSize, then clear difference with ' '.
+    if (dataSize_ > dataSize) {
+      for (auto i = dataSize_ - dataSize; i > 0; i--) {
+        file_ << ' ';
+      }
+    }
 
 Review comment:
   Your previous reply could be a great code comment explaining why the loop is necessary, but if I understand correctly just unwrapping the `if` body and leaving the `for` loop only would have equivalent behavior with 2 less lines and 1 less indentation level. (i.e. less complexity)
   
   Correct me if I'm wrong, but I suggest this in place of lines 146-151:
   ```
   // If a maxValueColumnName type is varchar then a new max value can be shorter than previous max value,
   // and due to ???, we need to pad the data in the state file
   for (auto i = dataSize_ - dataSize; i > 0; i--) {
     file_ << ' ';
   }
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #732: MINIFICPP-1013

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #732: MINIFICPP-1013
URL: https://github.com/apache/nifi-minifi-cpp/pull/732#discussion_r379351360
 
 

 ##########
 File path: extensions/sql/processors/ExecuteSQL.cpp
 ##########
 @@ -0,0 +1,123 @@
+/**
+ * @file ExecuteSQL.cpp
+ * ExecuteSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ExecuteSQL.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string ExecuteSQL::ProcessorName("ExecuteSQL");
+
+const core::Property ExecuteSQL::s_sqlSelectQuery(
+  core::PropertyBuilder::createProperty("SQL select query")->isRequired(true)->withDescription(
+    "The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. "
+    "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
+    "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database. "
+    "Note that Expression Language is not evaluated for flow file contents.")->supportsExpressionLanguage(true)->build());
+
+const core::Property ExecuteSQL::s_maxRowsPerFlowFile(
+	core::PropertyBuilder::createProperty("Max Rows Per Flow File")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+		"The maximum number of result rows that will be included intoi a flow file. If zero then all will be placed into the flow file")->supportsExpressionLanguage(true)->build());
+
+const core::Relationship ExecuteSQL::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultRowCount = "executesql.row.count";
+
+ExecuteSQL::ExecuteSQL(const std::string& name, utils::Identifier uuid)
+  : SQLProcessor(name, uuid), max_rows_(0) {
+}
+
+ExecuteSQL::~ExecuteSQL() {
+}
+
+void ExecuteSQL::initialize() {
+  //! Set the supported properties
+  setSupportedProperties( { dbControllerService(), outputFormat(), s_sqlSelectQuery, s_maxRowsPerFlowFile});
+
+  //! Set the supported relationships
+  setSupportedRelationships( { s_success });
+}
+
+void ExecuteSQL::processOnSchedule(const core::ProcessContext &context) {
+  initOutputFormat(context);
+
+  context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
+  context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
+}
+
+void ExecuteSQL::processOnTrigger(core::ProcessSession &session) {
+  auto statement = connection_->prepareStatement(sqlSelectQuery_);
+
+  auto rowset = statement->execute();
+
+  int count = 0;
+  size_t rowCount = 0;
+  sql::JSONSQLWriter sqlWriter(isJSONPretty());
+  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, { &sqlWriter });
+
+  // Process rowset.
+  do {
+    rowCount = sqlRowsetProcessor.process(max_rows_ == 0 ? std::numeric_limits<size_t>::max() : max_rows_);
+    count++;
+    if (rowCount == 0)
+      break;
+
+    const auto& output = sqlWriter.toString();
 
 Review comment:
   Changed in QueryDatabaseTable as well.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services