You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/01/24 11:23:50 UTC

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1242: MINIFICPP-1631 Create ListAzureDataLakeStorage processor

szaszm commented on a change in pull request #1242:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1242#discussion_r790612338



##########
File path: libminifi/include/utils/ListingStateUtils.h
##########
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <memory>
+
+#include "core/CoreComponentState.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+class ListedObject {
+ public:
+  virtual uint64_t getLastModified() const = 0;
+  virtual std::string getKey() const = 0;
+  virtual ~ListedObject() = default;
+};
+
+struct ListingState {
+  bool wasObjectListedAlready(const ListedObject &object_attributes) const;
+  void updateState(const ListedObject &object_attributes);
+
+  uint64_t listed_key_timestamp = 0;
+  std::unordered_set<std::string> listed_keys;
+};
+
+class ListingStateManager {
+ public:
+  explicit ListingStateManager(const std::shared_ptr<core::CoreComponentStateManager>& state_manager)
+    : state_manager_(state_manager) {
+  }
+
+  ListingState getCurrentState() const;

Review comment:
       All of the getters could be marked `[[nodiscard]]` (clang-tidy)

##########
File path: extensions/azure/storage/AzureDataLakeStorage.h
##########
@@ -24,11 +24,12 @@
 #include <memory>
 #include <optional>
 #include <utility>
+#include <string_view>
 
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "DataLakeStorageClient.h"
-#include "azure/core/io/body_stream.hpp"
+#include "utils/ListingStateUtils.h"

Review comment:
       I would rename this file to "`ListingStateManager`". The utils suffix is only really useful when there are all kinds of random utility classes and functions that you can't really name, but this file contains a class that already has a name.
   
   Note: If something is named *Manager, or more generally <verb>-er/-or, then usually something is not ideal in the design. It may be a function disguised as a class, or it might do too much (SRP). I didn't spend too much time analyzing this one, so I'm just leaving this note here for now. I also committed such classes, and it's not the end of the world in any case.

##########
File path: libminifi/include/utils/ListingStateUtils.h
##########
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <memory>
+
+#include "core/CoreComponentState.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+class ListedObject {
+ public:
+  virtual uint64_t getLastModified() const = 0;
+  virtual std::string getKey() const = 0;
+  virtual ~ListedObject() = default;
+};
+
+struct ListingState {
+  bool wasObjectListedAlready(const ListedObject &object_attributes) const;
+  void updateState(const ListedObject &object_attributes);
+
+  uint64_t listed_key_timestamp = 0;
+  std::unordered_set<std::string> listed_keys;
+};
+
+class ListingStateManager {
+ public:
+  explicit ListingStateManager(const std::shared_ptr<core::CoreComponentStateManager>& state_manager)
+    : state_manager_(state_manager) {

Review comment:
       clang-tidy: Pass by value and use std::move

##########
File path: extensions/aws/s3/S3Wrapper.h
##########
@@ -177,7 +179,15 @@ struct ListRequestParameters : public RequestParameters {
   uint64_t min_object_age = 0;
 };
 
-struct ListedObjectAttributes {
+struct ListedObjectAttributes : public minifi::utils::ListedObject {
+  uint64_t getLastModified() const override {
+    return gsl::narrow<uint64_t>(last_modified);
+  }

Review comment:
       Using chrono would help here, too. Now the purpose of the function is just to convert, but the name tells a different story.

##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.h
##########
@@ -0,0 +1,73 @@
+/**
+ * @file ListAzureDataLakeStorage.h
+ * ListAzureDataLakeStorage class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "AzureDataLakeStorageProcessorBase.h"
+
+class ListAzureDataLakeStorageTestsFixture;
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class ListAzureDataLakeStorage final : public AzureDataLakeStorageProcessorBase {
+ public:
+  EXTENSIONAPI static const core::Property RecurseSubdirectories;
+  EXTENSIONAPI static const core::Property FileFilter;
+  EXTENSIONAPI static const core::Property PathFilter;
+  EXTENSIONAPI static const core::Property ListingStrategy;
+
+  // Supported Relationships
+  static const core::Relationship Success;
+
+  explicit ListAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+    : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<ListAzureDataLakeStorage>::getLogger()) {
+  }
+
+  ~ListAzureDataLakeStorage() override = default;
+
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+
+ private:
+  friend class ::ListAzureDataLakeStorageTestsFixture;
+
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_FORBIDDEN;
+  }
+
+  explicit ListAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
+    : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<ListAzureDataLakeStorage>::getLogger(), std::move(data_lake_storage_client)) {
+  }
+
+  std::optional<storage::ListAzureDataLakeStorageParameters> buildListParameters(core::ProcessContext& context);
+  void createNewFlowFile(core::ProcessSession &session, const storage::ListDataLakeStorageElement &element);
+
+  bool recurse_subdirectories_ = true;
+  storage::EntityTracking tracking_strategy_ = storage::EntityTracking::TIMESTAMPS;
+  storage::ListAzureDataLakeStorageParameters list_parameters_;
+  std::unique_ptr<minifi::utils::ListingStateManager> state_manager_ = nullptr;

Review comment:
       `unique_ptr` defaults to nullptr, so no need for this default member initializer.

##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.cpp
##########
@@ -0,0 +1,157 @@
+/**
+ * @file ListAzureDataLakeStorage.cpp
+ * ListAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListAzureDataLakeStorage.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property ListAzureDataLakeStorage::RecurseSubdirectories(
+    core::PropertyBuilder::createProperty("Recurse Subdirectories")
+      ->isRequired(true)
+      ->withDefaultValue<bool>(true)
+      ->withDescription("Indicates whether to list files from subdirectories of the directory")
+      ->build());
+
+const core::Property ListAzureDataLakeStorage::FileFilter(
+  core::PropertyBuilder::createProperty("File Filter")
+    ->withDescription("Only files whose names match the given regular expression will be listed")
+    ->build());
+
+const core::Property ListAzureDataLakeStorage::PathFilter(
+  core::PropertyBuilder::createProperty("Path Filter")
+    ->withDescription("When 'Recurse Subdirectories' is true, then only subdirectories whose paths match the given regular expression will be scanned")
+    ->build());
+
+const core::Property ListAzureDataLakeStorage::ListingStrategy(
+  core::PropertyBuilder::createProperty("Listing Strategy")
+    ->withDescription("Specify how to determine new/updated entities. If 'timestamps' is selected it tracks the latest timestamp of listed entity to "
+                      "determine new/updated entities. If 'none' is selected it lists an entity without any tracking, the same entity will be listed each time on executing this processor.")
+    ->withDefaultValue<std::string>(toString(storage::EntityTracking::TIMESTAMPS))
+    ->withAllowableValues<std::string>(storage::EntityTracking::values())
+    ->build());
+
+const core::Relationship ListAzureDataLakeStorage::Success("success", "All FlowFiles that are received are routed to success");
+
+void ListAzureDataLakeStorage::initialize() {
+  // Set supported properties
+  setSupportedProperties({
+    AzureStorageCredentialsService,
+    FilesystemName,
+    DirectoryName,
+    RecurseSubdirectories,
+    FileFilter,
+    PathFilter,
+    ListingStrategy
+  });
+  // Set the supported relationships
+  setSupportedRelationships({
+    Success
+  });
+}
+
+void ListAzureDataLakeStorage::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+  AzureDataLakeStorageProcessorBase::onSchedule(context, sessionFactory);

Review comment:
       Consider adding the precondition `context && sessionFactory` here as well. I see that it's part of the base implementation, but a precondition is conceptually a declaration that should be part of the function declaration, so it's duplicated in delegating functions. I'm not insisting on this change, since this workaround (of using `gsl_Expects` and `gsl_Ensures` instead of a language feature) is not ideal anyway.

##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.cpp
##########
@@ -0,0 +1,157 @@
+/**
+ * @file ListAzureDataLakeStorage.cpp
+ * ListAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListAzureDataLakeStorage.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property ListAzureDataLakeStorage::RecurseSubdirectories(
+    core::PropertyBuilder::createProperty("Recurse Subdirectories")
+      ->isRequired(true)
+      ->withDefaultValue<bool>(true)
+      ->withDescription("Indicates whether to list files from subdirectories of the directory")
+      ->build());
+
+const core::Property ListAzureDataLakeStorage::FileFilter(
+  core::PropertyBuilder::createProperty("File Filter")
+    ->withDescription("Only files whose names match the given regular expression will be listed")
+    ->build());
+
+const core::Property ListAzureDataLakeStorage::PathFilter(
+  core::PropertyBuilder::createProperty("Path Filter")
+    ->withDescription("When 'Recurse Subdirectories' is true, then only subdirectories whose paths match the given regular expression will be scanned")
+    ->build());
+
+const core::Property ListAzureDataLakeStorage::ListingStrategy(
+  core::PropertyBuilder::createProperty("Listing Strategy")
+    ->withDescription("Specify how to determine new/updated entities. If 'timestamps' is selected it tracks the latest timestamp of listed entity to "
+                      "determine new/updated entities. If 'none' is selected it lists an entity without any tracking, the same entity will be listed each time on executing this processor.")
+    ->withDefaultValue<std::string>(toString(storage::EntityTracking::TIMESTAMPS))
+    ->withAllowableValues<std::string>(storage::EntityTracking::values())
+    ->build());
+
+const core::Relationship ListAzureDataLakeStorage::Success("success", "All FlowFiles that are received are routed to success");
+
+void ListAzureDataLakeStorage::initialize() {
+  // Set supported properties
+  setSupportedProperties({
+    AzureStorageCredentialsService,
+    FilesystemName,
+    DirectoryName,
+    RecurseSubdirectories,
+    FileFilter,
+    PathFilter,
+    ListingStrategy
+  });
+  // Set the supported relationships
+  setSupportedRelationships({
+    Success
+  });
+}
+
+void ListAzureDataLakeStorage::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+  AzureDataLakeStorageProcessorBase::onSchedule(context, sessionFactory);
+
+  auto state_manager = context->getStateManager();
+  if (state_manager == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+  state_manager_ = std::make_unique<minifi::utils::ListingStateManager>(state_manager);
+
+  auto params = buildListParameters(*context);
+  if (!params) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required parameters for ListAzureDataLakeStorage processor are missing or invalid");
+  }
+
+  list_parameters_ = *params;
+  tracking_strategy_ = storage::EntityTracking::parse(
+    utils::parsePropertyWithAllowableValuesOrThrow(*context, ListingStrategy.getName(), storage::EntityTracking::values()).c_str());
+}
+
+std::optional<storage::ListAzureDataLakeStorageParameters> ListAzureDataLakeStorage::buildListParameters(core::ProcessContext& context) {
+  storage::ListAzureDataLakeStorageParameters params;
+  if (!setCommonParameters(params, context, nullptr)) {
+    return std::nullopt;
+  }
+
+  if (!context.getProperty(RecurseSubdirectories.getName(), params.recurse_subdirectories)) {
+    logger_->log_error("Recurse Subdirectories property missing or invalid");
+    return std::nullopt;
+  }
+
+  context.getProperty(FileFilter.getName(), params.file_filter);
+  context.getProperty(PathFilter.getName(), params.path_filter);
+
+  return params;
+}
+
+void ListAzureDataLakeStorage::createNewFlowFile(core::ProcessSession &session, const storage::ListDataLakeStorageElement &element) {
+  auto flow_file = session.create();
+  session.putAttribute(flow_file, "azure.filesystem", element.filesystem);
+  session.putAttribute(flow_file, "azure.filePath", element.file_path);
+  session.putAttribute(flow_file, "azure.directory", element.directory);
+  session.putAttribute(flow_file, "azure.filename", element.filename);
+  session.putAttribute(flow_file, "azure.length", std::to_string(element.length));
+  session.putAttribute(flow_file, "azure.lastModified", std::to_string(element.last_modified));
+  session.putAttribute(flow_file, "azure.etag", element.etag);
+  session.transfer(flow_file, Success);

Review comment:
       I would prefer to have the transfer removed from this function. A "create" operation should return the result and not do random things with it.

##########
File path: extensions/azure/storage/AzureDataLakeStorage.cpp
##########
@@ -72,4 +77,62 @@ std::optional<uint64_t> AzureDataLakeStorage::fetchFile(const FetchAzureDataLake
   }
 }
 
+bool AzureDataLakeStorage::matchesPathFilter(const std::string& base_directory, const std::string& path_filter, std::string path) {
+  if (path_filter.empty()) {
+    return true;
+  }
+
+  if (!base_directory.empty()) {
+    gsl_Expects(minifi::utils::StringUtils::startsWith(path, base_directory));

Review comment:
       A precondition should preferably be declared at the start of the function body with `gsl_Expects`.
   You may want an implication (base_directory is not empty -> path starts with base_directory), which doesn't have a logical operator in C++, so I think it would be best to add a function `bool implies(bool a, bool b) noexcept { return !a || b; }` (maybe in GeneralUtils.h?) and use it in the declaration.

##########
File path: extensions/azure/storage/AzureDataLakeStorage.h
##########
@@ -43,15 +44,38 @@ struct UploadDataLakeStorageResult {
   std::string primary_uri;
 };
 
+struct ListDataLakeStorageElement : public minifi::utils::ListedObject {
+  std::string filesystem;
+  std::string file_path;
+  std::string directory;
+  std::string filename;
+  uint64_t length = 0;
+  uint64_t last_modified = 0;
+  std::string etag;
+
+  uint64_t getLastModified() const override {
+    return last_modified;
+  }

Review comment:
       Consider using `time_point` for timestamps. 

##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.cpp
##########
@@ -0,0 +1,157 @@
+/**
+ * @file ListAzureDataLakeStorage.cpp
+ * ListAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListAzureDataLakeStorage.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property ListAzureDataLakeStorage::RecurseSubdirectories(
+    core::PropertyBuilder::createProperty("Recurse Subdirectories")
+      ->isRequired(true)
+      ->withDefaultValue<bool>(true)
+      ->withDescription("Indicates whether to list files from subdirectories of the directory")
+      ->build());
+
+const core::Property ListAzureDataLakeStorage::FileFilter(
+  core::PropertyBuilder::createProperty("File Filter")
+    ->withDescription("Only files whose names match the given regular expression will be listed")
+    ->build());
+
+const core::Property ListAzureDataLakeStorage::PathFilter(
+  core::PropertyBuilder::createProperty("Path Filter")
+    ->withDescription("When 'Recurse Subdirectories' is true, then only subdirectories whose paths match the given regular expression will be scanned")
+    ->build());
+
+const core::Property ListAzureDataLakeStorage::ListingStrategy(
+  core::PropertyBuilder::createProperty("Listing Strategy")
+    ->withDescription("Specify how to determine new/updated entities. If 'timestamps' is selected it tracks the latest timestamp of listed entity to "
+                      "determine new/updated entities. If 'none' is selected it lists an entity without any tracking, the same entity will be listed each time on executing this processor.")
+    ->withDefaultValue<std::string>(toString(storage::EntityTracking::TIMESTAMPS))
+    ->withAllowableValues<std::string>(storage::EntityTracking::values())
+    ->build());
+
+const core::Relationship ListAzureDataLakeStorage::Success("success", "All FlowFiles that are received are routed to success");
+
+void ListAzureDataLakeStorage::initialize() {
+  // Set supported properties
+  setSupportedProperties({
+    AzureStorageCredentialsService,
+    FilesystemName,
+    DirectoryName,
+    RecurseSubdirectories,
+    FileFilter,
+    PathFilter,
+    ListingStrategy
+  });
+  // Set the supported relationships
+  setSupportedRelationships({
+    Success
+  });
+}
+
+void ListAzureDataLakeStorage::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+  AzureDataLakeStorageProcessorBase::onSchedule(context, sessionFactory);
+
+  auto state_manager = context->getStateManager();
+  if (state_manager == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+  state_manager_ = std::make_unique<minifi::utils::ListingStateManager>(state_manager);
+
+  auto params = buildListParameters(*context);
+  if (!params) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required parameters for ListAzureDataLakeStorage processor are missing or invalid");
+  }
+
+  list_parameters_ = *params;
+  tracking_strategy_ = storage::EntityTracking::parse(
+    utils::parsePropertyWithAllowableValuesOrThrow(*context, ListingStrategy.getName(), storage::EntityTracking::values()).c_str());
+}
+
+std::optional<storage::ListAzureDataLakeStorageParameters> ListAzureDataLakeStorage::buildListParameters(core::ProcessContext& context) {
+  storage::ListAzureDataLakeStorageParameters params;
+  if (!setCommonParameters(params, context, nullptr)) {
+    return std::nullopt;
+  }
+
+  if (!context.getProperty(RecurseSubdirectories.getName(), params.recurse_subdirectories)) {
+    logger_->log_error("Recurse Subdirectories property missing or invalid");
+    return std::nullopt;
+  }
+
+  context.getProperty(FileFilter.getName(), params.file_filter);
+  context.getProperty(PathFilter.getName(), params.path_filter);
+
+  return params;
+}
+
+void ListAzureDataLakeStorage::createNewFlowFile(core::ProcessSession &session, const storage::ListDataLakeStorageElement &element) {

Review comment:
       Clang-Tidy: Method 'createNewFlowFile' can be made static.
   
   It could also be made into a local helper function with internal linkage. (Anonymous namespace?)

##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.h
##########
@@ -0,0 +1,73 @@
+/**
+ * @file ListAzureDataLakeStorage.h
+ * ListAzureDataLakeStorage class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "AzureDataLakeStorageProcessorBase.h"
+
+class ListAzureDataLakeStorageTestsFixture;
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class ListAzureDataLakeStorage final : public AzureDataLakeStorageProcessorBase {
+ public:
+  EXTENSIONAPI static const core::Property RecurseSubdirectories;
+  EXTENSIONAPI static const core::Property FileFilter;
+  EXTENSIONAPI static const core::Property PathFilter;
+  EXTENSIONAPI static const core::Property ListingStrategy;
+
+  // Supported Relationships
+  static const core::Relationship Success;
+
+  explicit ListAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+    : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<ListAzureDataLakeStorage>::getLogger()) {
+  }
+
+  ~ListAzureDataLakeStorage() override = default;
+
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+
+ private:
+  friend class ::ListAzureDataLakeStorageTestsFixture;
+
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_FORBIDDEN;
+  }
+
+  explicit ListAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
+    : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<ListAzureDataLakeStorage>::getLogger(), std::move(data_lake_storage_client)) {
+  }
+
+  std::optional<storage::ListAzureDataLakeStorageParameters> buildListParameters(core::ProcessContext& context);
+  void createNewFlowFile(core::ProcessSession &session, const storage::ListDataLakeStorageElement &element);
+
+  bool recurse_subdirectories_ = true;

Review comment:
       This is unused, the information seems to be moved to `list_parameters_`. Please remove.

##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.h
##########
@@ -0,0 +1,73 @@
+/**
+ * @file ListAzureDataLakeStorage.h
+ * ListAzureDataLakeStorage class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "AzureDataLakeStorageProcessorBase.h"
+
+class ListAzureDataLakeStorageTestsFixture;
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class ListAzureDataLakeStorage final : public AzureDataLakeStorageProcessorBase {
+ public:
+  EXTENSIONAPI static const core::Property RecurseSubdirectories;
+  EXTENSIONAPI static const core::Property FileFilter;
+  EXTENSIONAPI static const core::Property PathFilter;
+  EXTENSIONAPI static const core::Property ListingStrategy;
+
+  // Supported Relationships
+  static const core::Relationship Success;

Review comment:
       I think this comment doesn't add anything that's not already obvious from the context.

##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.cpp
##########
@@ -0,0 +1,157 @@
+/**
+ * @file ListAzureDataLakeStorage.cpp
+ * ListAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListAzureDataLakeStorage.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property ListAzureDataLakeStorage::RecurseSubdirectories(
+    core::PropertyBuilder::createProperty("Recurse Subdirectories")
+      ->isRequired(true)
+      ->withDefaultValue<bool>(true)
+      ->withDescription("Indicates whether to list files from subdirectories of the directory")
+      ->build());
+
+const core::Property ListAzureDataLakeStorage::FileFilter(
+  core::PropertyBuilder::createProperty("File Filter")
+    ->withDescription("Only files whose names match the given regular expression will be listed")
+    ->build());
+
+const core::Property ListAzureDataLakeStorage::PathFilter(
+  core::PropertyBuilder::createProperty("Path Filter")
+    ->withDescription("When 'Recurse Subdirectories' is true, then only subdirectories whose paths match the given regular expression will be scanned")
+    ->build());
+
+const core::Property ListAzureDataLakeStorage::ListingStrategy(
+  core::PropertyBuilder::createProperty("Listing Strategy")
+    ->withDescription("Specify how to determine new/updated entities. If 'timestamps' is selected it tracks the latest timestamp of listed entity to "
+                      "determine new/updated entities. If 'none' is selected it lists an entity without any tracking, the same entity will be listed each time on executing this processor.")
+    ->withDefaultValue<std::string>(toString(storage::EntityTracking::TIMESTAMPS))
+    ->withAllowableValues<std::string>(storage::EntityTracking::values())
+    ->build());
+
+const core::Relationship ListAzureDataLakeStorage::Success("success", "All FlowFiles that are received are routed to success");
+
+void ListAzureDataLakeStorage::initialize() {
+  // Set supported properties
+  setSupportedProperties({
+    AzureStorageCredentialsService,
+    FilesystemName,
+    DirectoryName,
+    RecurseSubdirectories,
+    FileFilter,
+    PathFilter,
+    ListingStrategy
+  });
+  // Set the supported relationships
+  setSupportedRelationships({
+    Success
+  });
+}
+
+void ListAzureDataLakeStorage::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+  AzureDataLakeStorageProcessorBase::onSchedule(context, sessionFactory);
+
+  auto state_manager = context->getStateManager();
+  if (state_manager == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+  state_manager_ = std::make_unique<minifi::utils::ListingStateManager>(state_manager);
+
+  auto params = buildListParameters(*context);
+  if (!params) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required parameters for ListAzureDataLakeStorage processor are missing or invalid");
+  }
+
+  list_parameters_ = *params;

Review comment:
       Consider using `std::move`. Optional has the necessary rvalue reference qualified overloads.
   
   ```suggestion
     list_parameters_ = *std::move(params);
   ```

##########
File path: extensions/azure/storage/AzureDataLakeStorage.cpp
##########
@@ -72,4 +77,62 @@ std::optional<uint64_t> AzureDataLakeStorage::fetchFile(const FetchAzureDataLake
   }
 }
 
+bool AzureDataLakeStorage::matchesPathFilter(const std::string& base_directory, const std::string& path_filter, std::string path) {
+  if (path_filter.empty()) {
+    return true;
+  }
+
+  if (!base_directory.empty()) {
+    gsl_Expects(minifi::utils::StringUtils::startsWith(path, base_directory));
+    path = path.size() == base_directory.size() ? "" : path.substr(base_directory.size() + 1);
+  }
+
+  std::regex pattern(path_filter);
+  return std::regex_match(path, pattern);
+}
+
+bool AzureDataLakeStorage::matchesFileFilter(const std::string& file_filter, const std::string& filename) {

Review comment:
       Clang-Tidy: Method 'matchesFileFilter' can be made static
   
   In general, if it can be static and private, it can also be hidden in the implementation file in an anonymous namespace. I prefer it that way, but not everyone agrees with this.
   
   Consider using `std::string_view` instead of `const std::string&` to pass strings.

##########
File path: libminifi/src/utils/ListingStateUtils.cpp
##########
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ListingStateUtils.h"
+
+#include "core/Property.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+const std::string ListingStateManager::LATEST_LISTED_OBJECT_PREFIX = "listed_key.";
+const std::string ListingStateManager::LATEST_LISTED_OBJECT_TIMESTAMP = "listed_timestamp";
+
+bool ListingState::wasObjectListedAlready(const ListedObject &object) const {
+  return listed_key_timestamp > object.getLastModified() ||
+      (listed_key_timestamp == object.getLastModified() && listed_keys.find(object.getKey()) != listed_keys.end());
+}
+
+void ListingState::updateState(const ListedObject &object) {
+  if (listed_key_timestamp < object.getLastModified()) {
+    listed_key_timestamp = object.getLastModified();
+    listed_keys.clear();
+    listed_keys.insert(object.getKey());
+  } else if (listed_key_timestamp == object.getLastModified()) {
+    listed_keys.insert(object.getKey());
+  }
+}
+
+uint64_t ListingStateManager::getLatestListedKeyTimestamp(const std::unordered_map<std::string, std::string> &state) const {

Review comment:
       Clang-Tidy: Method 'getLatestListedKeyTimestamp' can be made static

##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.cpp
##########
@@ -0,0 +1,157 @@
+/**
+ * @file ListAzureDataLakeStorage.cpp
+ * ListAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListAzureDataLakeStorage.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property ListAzureDataLakeStorage::RecurseSubdirectories(
+    core::PropertyBuilder::createProperty("Recurse Subdirectories")
+      ->isRequired(true)
+      ->withDefaultValue<bool>(true)
+      ->withDescription("Indicates whether to list files from subdirectories of the directory")
+      ->build());
+
+const core::Property ListAzureDataLakeStorage::FileFilter(
+  core::PropertyBuilder::createProperty("File Filter")
+    ->withDescription("Only files whose names match the given regular expression will be listed")
+    ->build());
+
+const core::Property ListAzureDataLakeStorage::PathFilter(
+  core::PropertyBuilder::createProperty("Path Filter")
+    ->withDescription("When 'Recurse Subdirectories' is true, then only subdirectories whose paths match the given regular expression will be scanned")
+    ->build());
+
+const core::Property ListAzureDataLakeStorage::ListingStrategy(
+  core::PropertyBuilder::createProperty("Listing Strategy")
+    ->withDescription("Specify how to determine new/updated entities. If 'timestamps' is selected it tracks the latest timestamp of listed entity to "
+                      "determine new/updated entities. If 'none' is selected it lists an entity without any tracking, the same entity will be listed each time on executing this processor.")
+    ->withDefaultValue<std::string>(toString(storage::EntityTracking::TIMESTAMPS))
+    ->withAllowableValues<std::string>(storage::EntityTracking::values())
+    ->build());
+
+const core::Relationship ListAzureDataLakeStorage::Success("success", "All FlowFiles that are received are routed to success");
+
+void ListAzureDataLakeStorage::initialize() {
+  // Set supported properties

Review comment:
       The comments in this function don't add value IMO, and should be removed. They say exactly what the code below them says anyway.

##########
File path: libminifi/src/utils/ListingStateUtils.cpp
##########
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ListingStateUtils.h"
+
+#include "core/Property.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+const std::string ListingStateManager::LATEST_LISTED_OBJECT_PREFIX = "listed_key.";
+const std::string ListingStateManager::LATEST_LISTED_OBJECT_TIMESTAMP = "listed_timestamp";
+
+bool ListingState::wasObjectListedAlready(const ListedObject &object) const {
+  return listed_key_timestamp > object.getLastModified() ||
+      (listed_key_timestamp == object.getLastModified() && listed_keys.find(object.getKey()) != listed_keys.end());
+}
+
+void ListingState::updateState(const ListedObject &object) {
+  if (listed_key_timestamp < object.getLastModified()) {
+    listed_key_timestamp = object.getLastModified();
+    listed_keys.clear();
+    listed_keys.insert(object.getKey());
+  } else if (listed_key_timestamp == object.getLastModified()) {
+    listed_keys.insert(object.getKey());
+  }
+}
+
+uint64_t ListingStateManager::getLatestListedKeyTimestamp(const std::unordered_map<std::string, std::string> &state) const {
+  std::string stored_listed_key_timestamp_str;
+  auto it = state.find(LATEST_LISTED_OBJECT_TIMESTAMP);
+  if (it != state.end()) {
+    stored_listed_key_timestamp_str = it->second;
+  }
+
+  int64_t stored_listed_key_timestamp = 0;
+  core::Property::StringToInt(stored_listed_key_timestamp_str, stored_listed_key_timestamp);
+
+  return stored_listed_key_timestamp;
+}
+
+std::unordered_set<std::string> ListingStateManager::getLatestListedKeys(const std::unordered_map<std::string, std::string> &state) const {

Review comment:
       Clang-Tidy: Method 'getLatestListedKeys' can be made static

##########
File path: extensions/azure/storage/AzureDataLakeStorage.cpp
##########
@@ -72,4 +77,62 @@ std::optional<uint64_t> AzureDataLakeStorage::fetchFile(const FetchAzureDataLake
   }
 }
 
+bool AzureDataLakeStorage::matchesPathFilter(const std::string& base_directory, const std::string& path_filter, std::string path) {

Review comment:
       Consider using `std::string_view` instead of `const std::string&` to pass strings.
   
   Clang-Tidy: Method 'matchesPathFilter' can be made static




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org