You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/03/08 14:13:28 UTC

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #979: MINIFICPP-1456 Introduce PutAzureBlobStorage processor

szaszm commented on a change in pull request #979:
URL: https://github.com/apache/nifi-minifi-cpp/pull/979#discussion_r589443249



##########
File path: extensions/azure/processors/PutAzureBlobStorage.h
##########
@@ -0,0 +1,136 @@
+/**
+ * @file PutAzureBlobStorage.h
+ * PutAzureBlobStorage class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <utility>
+#include <vector>
+#include <string>
+#include <memory>
+
+#include "core/Property.h"
+#include "core/Processor.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "storage/BlobStorage.h"
+#include "utils/OptionalUtils.h"
+
+class PutAzureBlobStorageTestsFixture;
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace azure {
+namespace processors {
+
+class PutAzureBlobStorage : public core::Processor {
+ public:
+  static constexpr char const* ProcessorName = "PutAzureBlobStorage";
+
+  // Supported Properties
+  static const core::Property ContainerName;
+  static const core::Property AzureStorageCredentialsService;
+  static const core::Property StorageAccountName;
+  static const core::Property StorageAccountKey;
+  static const core::Property SASToken;
+  static const core::Property CommonStorageAccountEndpointSuffix;
+  static const core::Property ConnectionString;
+  static const core::Property Blob;
+  static const core::Property CreateContainer;
+
+  // Supported Relationships
+  static const core::Relationship Failure;
+  static const core::Relationship Success;
+
+  explicit PutAzureBlobStorage(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+    : PutAzureBlobStorage(name, uuid, nullptr) {
+  }
+
+  ~PutAzureBlobStorage() override = default;
+
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+
+  class ReadCallback : public InputStreamCallback {
+   public:
+    ReadCallback(uint64_t flow_size, azure::storage::BlobStorage& blob_storage_wrapper, const std::string &blob_name)
+      : flow_size_(flow_size)
+      , blob_storage_wrapper_(blob_storage_wrapper)
+      , blob_name_(blob_name) {
+    }
+
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+      std::vector<uint8_t> buffer;
+      int read_ret = stream->read(buffer, flow_size_);
+      if (read_ret < 0) {
+        return -1;
+      }
+
+      result_ = blob_storage_wrapper_.uploadBlob(blob_name_, buffer.data(), flow_size_);
+      if (!result_) {
+        return -1;
+      }
+      return result_->length;
+    }
+
+    utils::optional<azure::storage::UploadBlobResult> getResult() const {
+      return result_;
+    }
+
+   private:
+    uint64_t flow_size_;
+    azure::storage::BlobStorage &blob_storage_wrapper_;
+    std::string blob_name_;
+    utils::optional<azure::storage::UploadBlobResult> result_ = utils::nullopt;
+  };
+
+ private:
+  friend class ::PutAzureBlobStorageTestsFixture;
+
+  explicit PutAzureBlobStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::BlobStorage> blob_storage_wrapper)
+    : core::Processor(name, uuid)
+    , blob_storage_wrapper_(std::move(blob_storage_wrapper)) {
+  }
+
+  std::string getConnectionStringFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const;
+  std::string getAzureConnectionStringFromProperties(

Review comment:
       `getAzureConnectionStringFromProperties` could be made static and possibly hidden in the implementation file.

##########
File path: extensions/azure/storage/BlobStorage.h
##########
@@ -0,0 +1,63 @@
+/**
+ * @file BlobStorage.h
+ * BlobStorage class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "utils/OptionalUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace azure {
+namespace storage {
+
+struct UploadBlobResult {
+  std::string primary_uri;
+  std::string etag;
+  std::size_t length;
+  std::string timestamp;
+};
+
+class BlobStorage {
+ public:
+  BlobStorage(const std::string &connection_string, const std::string &container_name)
+    : connection_string_(connection_string)
+    , container_name_(container_name) {

Review comment:
       Consider pass-by-value and `std::move`

##########
File path: libminifi/test/azure-tests/CMakeLists.txt
##########
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+set(CMAKE_CXX_STANDARD 14)
+set(CMAKE_CXX_STANDARD_REQUIRED ON)

Review comment:
       This should be a target requirement of the tests instead of forcing the global language version.

##########
File path: libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp
##########
@@ -0,0 +1,276 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../TestBase.h"
+#include "core/Processor.h"
+#include "processors/PutAzureBlobStorage.h"
+#include "processors/GetFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "storage/BlobStorage.h"
+#include "utils/file/FileUtils.h"
+
+const std::string CONTAINER_NAME = "test-container";
+const std::string STORAGE_ACCOUNT_NAME = "test-account";
+const std::string STORAGE_ACCOUNT_KEY = "test-key";
+const std::string SAS_TOKEN = "test-sas-token";
+const std::string ENDPOINT_SUFFIX = "test.suffix.com";
+const std::string CONNECTION_STRING = "test-connectionstring";
+const std::string BLOB_NAME = "test-blob.txt";
+const std::string TEST_DATA = "data";
+
+class MockBlobStorage : public minifi::azure::storage::BlobStorage {
+ public:
+  const std::string ETAG = "test-etag";
+  const std::string PRIMARY_URI = "test-uri";
+  const std::string TEST_TIMESTAMP = "test-timestamp";
+
+  MockBlobStorage()
+    : BlobStorage("", "") {
+  }
+
+  void createContainer() override {
+    container_created_ = true;
+  }
+
+  void resetClientIfNeeded(const std::string &connection_string, const std::string &container_name) override {
+    connection_string_ = connection_string;
+    container_name_ = container_name;
+  }
+
+  utils::optional<minifi::azure::storage::UploadBlobResult> uploadBlob(const std::string &blob_name, const uint8_t* buffer, std::size_t buffer_size) override {
+    input_data = std::string(buffer, buffer + buffer_size);
+    minifi::azure::storage::UploadBlobResult result;
+    result.etag = ETAG;
+    result.length = buffer_size;
+    result.primary_uri = PRIMARY_URI;
+    result.timestamp = TEST_TIMESTAMP;
+    return result;
+  }
+
+  std::string getConnectionString() const {
+    return connection_string_;
+  }
+
+  std::string getContainerName() const {
+    return container_name_;
+  }
+
+  bool getContainerCreated() const {
+    return container_created_;
+  }
+
+  std::string input_data;
+
+ private:
+  bool container_created_ = false;
+};
+
+class PutAzureBlobStorageTestsFixture {
+ public:
+  PutAzureBlobStorageTestsFixture() {
+    LogTestController::getInstance().setDebug<TestPlan>();
+    LogTestController::getInstance().setDebug<minifi::core::Processor>();
+    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+    LogTestController::getInstance().setTrace<processors::GetFile>();
+    LogTestController::getInstance().setDebug<processors::UpdateAttribute>();
+    LogTestController::getInstance().setDebug<processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<minifi::azure::processors::PutAzureBlobStorage>();
+
+    // Build MiNiFi processing graph
+    plan = test_controller.createPlan();
+    auto mock_blob_storage = utils::make_unique<MockBlobStorage>();
+    mock_blob_storage_ptr = mock_blob_storage.get();
+    put_azure_blob_storage = std::shared_ptr<minifi::azure::processors::PutAzureBlobStorage>(
+      new minifi::azure::processors::PutAzureBlobStorage("PutAzureBlobStorage", utils::Identifier(), std::move(mock_blob_storage)));

Review comment:
       Use `std::make_shared`

##########
File path: extensions/azure/processors/PutAzureBlobStorage.cpp
##########
@@ -0,0 +1,229 @@
+/**
+ * @file PutAzureBlobStorage.cpp
+ * PutAzureBlobStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutAzureBlobStorage.h"
+
+#include <memory>
+#include <string>
+
+#include "storage/AzureBlobStorage.h"
+#include "controllerservices/AzureStorageCredentialsService.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace azure {
+namespace processors {
+
+const core::Property PutAzureBlobStorage::ContainerName(
+  core::PropertyBuilder::createProperty("Container Name")
+    ->withDescription("Name of the Azure storage container. In case of PutAzureBlobStorage processor, container can be created if it does not exist.")
+    ->supportsExpressionLanguage(true)
+    ->isRequired(true)
+    ->build());
+const core::Property PutAzureBlobStorage::AzureStorageCredentialsService(
+  core::PropertyBuilder::createProperty("Azure Storage Credentials Service")
+    ->withDescription("Name of the Azure Storage Credentials Service used to retrieve the connection string from.")
+    ->build());
+const core::Property PutAzureBlobStorage::StorageAccountName(
+    core::PropertyBuilder::createProperty("Storage Account Name")
+      ->withDescription("The storage account name.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+const core::Property PutAzureBlobStorage::StorageAccountKey(
+    core::PropertyBuilder::createProperty("Storage Account Key")
+      ->withDescription("The storage account key. This is an admin-like password providing access to every container in this account. "
+                        "It is recommended one uses Shared Access Signature (SAS) token instead for fine-grained control with policies.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+const core::Property PutAzureBlobStorage::SASToken(
+    core::PropertyBuilder::createProperty("SAS Token")
+      ->withDescription("Shared Access Signature token. Specify either SAS Token (recommended) or Account Key.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+const core::Property PutAzureBlobStorage::CommonStorageAccountEndpointSuffix(
+    core::PropertyBuilder::createProperty("Common Storage Account Endpoint Suffix")
+      ->withDescription("Storage accounts in public Azure always use a common FQDN suffix. Override this endpoint suffix with a "
+                        "different suffix in certain circumstances (like Azure Stack or non-public Azure regions). ")
+      ->supportsExpressionLanguage(true)
+      ->build());
+const core::Property PutAzureBlobStorage::ConnectionString(
+  core::PropertyBuilder::createProperty("Connection String")
+    ->withDescription("Connection string used to connect to Azure Storage service. This overrides all other set credential properties.")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property PutAzureBlobStorage::Blob(
+  core::PropertyBuilder::createProperty("Blob")
+    ->withDescription("The filename of the blob.")
+    ->supportsExpressionLanguage(true)
+    ->isRequired(true)
+    ->build());
+const core::Property PutAzureBlobStorage::CreateContainer(
+  core::PropertyBuilder::createProperty("Create Container")
+    ->withDescription("Specifies whether to check if the container exists and to automatically create it if it does not. "
+                      "Permission to list containers is required. If false, this check is not made, but the Put operation will "
+                      "fail if the container does not exist.")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(false)
+    ->build());
+
+const core::Relationship PutAzureBlobStorage::Success("success", "All successfully processed FlowFiles are routed to this relationship");
+const core::Relationship PutAzureBlobStorage::Failure("failure", "Unsuccessful operations will be transferred to the failure relationship");
+
+void PutAzureBlobStorage::initialize() {
+  // Set the supported properties
+  setSupportedProperties({
+    ContainerName,
+    StorageAccountName,
+    StorageAccountKey,
+    SASToken,
+    CommonStorageAccountEndpointSuffix,
+    ConnectionString,
+    AzureStorageCredentialsService,
+    Blob,
+    CreateContainer
+  });
+  // Set the supported relationships
+  setSupportedRelationships({
+    Failure,
+    Success
+  });
+}
+
+void PutAzureBlobStorage::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  context->getProperty(CreateContainer.getName(), create_container_);
+}
+
+std::string PutAzureBlobStorage::getConnectionStringFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const {
+  std::string service_name;
+  if (!context->getProperty(AzureStorageCredentialsService.getName(), service_name) || service_name.empty()) {
+    return "";
+  }
+
+  std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(service_name);
+  if (nullptr == service) {
+    logger_->log_error("Azure storage credentials service with name: '%s' could not be found", service_name.c_str());
+    return "";
+  }
+
+  auto azure_credentials_service = std::dynamic_pointer_cast<minifi::azure::controllers::AzureStorageCredentialsService>(service);
+  if (!azure_credentials_service) {
+    logger_->log_error("Controller service with name: '%s' is not an Azure storage credentials service", service_name.c_str());
+    return "";
+  }
+
+  return azure_credentials_service->getConnectionString();
+}
+
+std::string PutAzureBlobStorage::getAzureConnectionStringFromProperties(
+    const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::FlowFile> &flow_file) const {
+  azure::storage::AzureStorageCredentials credentials;
+  context->getProperty(StorageAccountName, credentials.storage_account_name, flow_file);
+  context->getProperty(StorageAccountKey, credentials.storage_account_key, flow_file);
+  context->getProperty(SASToken, credentials.sas_token, flow_file);
+  context->getProperty(CommonStorageAccountEndpointSuffix, credentials.endpoint_suffix, flow_file);
+  context->getProperty(ConnectionString, credentials.connection_string, flow_file);
+  return credentials.getConnectionString();
+}
+
+void PutAzureBlobStorage::createAzureStorageClient(const std::string &connection_string, const std::string &container_name) {
+  // When used in multithreaded environment make sure to use the azure_storage_mutex_ to lock the wrapper so the
+  // client is not reset with different configuration while another thread is using it.
+  if (blob_storage_wrapper_ == nullptr) {
+    blob_storage_wrapper_ = minifi::utils::make_unique<storage::AzureBlobStorage>(connection_string, container_name);
+  } else {
+    blob_storage_wrapper_->resetClientIfNeeded(connection_string, container_name);
+  }
+}
+
+std::string PutAzureBlobStorage::getConnectionString(
+    const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::FlowFile> &flow_file) const {
+  auto connection_string = getAzureConnectionStringFromProperties(context, flow_file);
+  if (!connection_string.empty()) {
+    return connection_string;
+  }
+
+  return getConnectionStringFromControllerService(context);
+}
+
+void PutAzureBlobStorage::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+  logger_->log_debug("PutAzureBlobStorage onTrigger");
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    return;
+  }
+
+  auto connection_string = getConnectionString(context, flow_file);
+  if (connection_string.empty()) {
+    logger_->log_error("Connection string is empty!");

Review comment:
       Should we `yield` on connection errors? Or is it a bad idea because the connection info of the next flow file could succeed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org