You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by phrocker <gi...@git.apache.org> on 2018/01/11 16:04:05 UTC

[GitHub] nifi-minifi-cpp pull request #237: MINIFICPP-37: Create an executable to sup...

GitHub user phrocker opened a pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/237

    MINIFICPP-37: Create an executable to support basic localized devops …

    …operations.
    
    This includes stopping components, clearing queues, getting queue information, and updating the flow
    
    Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
         in the commit message?
    
    - [ ] Does your PR title start with MINIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
    - [ ] If applicable, have you updated the LICENSE file?
    - [ ] If applicable, have you updated the NOTICE file?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/phrocker/nifi-minifi-cpp MINIFICPP-37

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi-minifi-cpp/pull/237.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #237
    
----
commit 87051f980f1a3f6b49bbb87647dfd5687200d88c
Author: Marc Parisi <ph...@...>
Date:   2018-01-07T16:42:03Z

    MINIFICPP-37: Create an executable to support basic localized devops operations.
    
    This includes stopping components, clearing queues, getting queue information, and updating the flow

----


---

[GitHub] nifi-minifi-cpp pull request #237: MINIFICPP-37: Create an executable to sup...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/237#discussion_r161239844
  
    --- Diff: libminifi/include/c2/C2Agent.h ---
    @@ -73,7 +73,7 @@ class C2Agent : public state::UpdateController, public state::metrics::MetricsSi
        */
       virtual int16_t setMetrics(const std::shared_ptr<state::metrics::Metrics> &metric);
     
    -  int64_t getHeartBestDelay(){
    +  int64_t getHeartBeatDelay(){
    --- End diff --
    
    should help with minifi's arrhythmia 😅 


---

[GitHub] nifi-minifi-cpp pull request #237: MINIFICPP-37: Create an executable to sup...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi-minifi-cpp/pull/237


---

[GitHub] nifi-minifi-cpp pull request #237: MINIFICPP-37: Create an executable to sup...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/237#discussion_r161240472
  
    --- Diff: libminifi/include/io/DescriptorStream.h ---
    @@ -0,0 +1,194 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef LIBMINIFI_INCLUDE_IO_DESCRIPTORSTREAM_H_
    +#define LIBMINIFI_INCLUDE_IO_DESCRIPTORSTREAM_H_
    +
    +#include <iostream>
    +#include <cstdint>
    +#include <string>
    +#include "EndianCheck.h"
    +#include "BaseStream.h"
    +#include "Serializable.h"
    +#include "core/logging/LoggerConfiguration.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace io {
    +
    +/**
    + * Purpose: File Stream Base stream extension. This is intended to be a thread safe access to
    + * read/write to the local file system.
    + *
    + * Design: Simply extends BaseStream and overrides readData/writeData to allow a sink to the
    + * fstream object.
    + */
    +class DescriptorStream : public io::BaseStream {
    + public:
    +  /**
    +   * File Stream constructor that accepts an fstream shared pointer.
    +   * It must already be initialized for read and write.
    +   */
    +  explicit DescriptorStream(int fd);
    +
    +  virtual ~DescriptorStream() {
    +
    +  }
    +
    +  /**
    +   * Skip to the specified offset.
    +   * @param offset offset to which we will skip
    +   */
    +  void seek(uint64_t offset);
    +
    +  const uint64_t getSize() const {
    +    return -1;
    +  }
    +
    +  // data stream extensions
    +  /**
    +   * Reads data and places it into buf
    +   * @param buf buffer in which we extract data
    +   * @param buflen
    +   */
    +  virtual int readData(std::vector<uint8_t> &buf, int buflen);
    +  /**
    +   * Reads data and places it into buf
    +   * @param buf buffer in which we extract data
    +   * @param buflen
    +   */
    +  virtual int readData(uint8_t *buf, int buflen);
    +
    +  /**
    +   * Write value to the stream using std::vector
    +   * @param buf incoming buffer
    +   * @param buflen buffer to write
    +   *
    +   */
    +  virtual int writeData(std::vector<uint8_t> &buf, int buflen);
    +
    +  /**
    +   * writes value to stream
    +   * @param value value to write
    +   * @param size size of value
    +   */
    +  virtual int writeData(uint8_t *value, int size);
    +
    +  /**
    +   * Returns the underlying buffer
    +   * @return vector's array
    +   **/
    +  const uint8_t *getBuffer() const {
    +    throw std::runtime_error("Stream does not support this operation");
    +  }
    +
    +
    +
    +  /**
    +   * write UTF string to stream
    +   * @param str string to write
    +   * @return resulting write size
    +   **/
    +  //virtual int writeUTF(std::string str, bool widen = false);
    --- End diff --
    
    remove?


---

[GitHub] nifi-minifi-cpp issue #237: MINIFICPP-37: Create an executable to support ba...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the issue:

    https://github.com/apache/nifi-minifi-cpp/pull/237
  
    reviewing


---

[GitHub] nifi-minifi-cpp pull request #237: MINIFICPP-37: Create an executable to sup...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/237#discussion_r161136867
  
    --- Diff: controller/MiNiFiController.cpp ---
    @@ -0,0 +1,196 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include <fcntl.h>
    +#include <stdio.h>
    +#include <semaphore.h>
    +#include <signal.h>
    +#include <vector>
    +#include <queue>
    +#include <map>
    +#include <unistd.h>
    +#include <yaml-cpp/yaml.h>
    +#include <iostream>
    +#include "io/BaseStream.h"
    +
    +#include "core/Core.h"
    +
    +#include "core/FlowConfiguration.h"
    +#include "core/ConfigurationFactory.h"
    +#include "core/RepositoryFactory.h"
    +#include "FlowController.h"
    +#include "Main.h"
    +
    +#include "Controller.h"
    +#include "c2/ControllerSocketProtocol.h"
    +
    +#include "cxxopts.hpp"
    +
    +int main(int argc, char **argv) {
    +
    +  std::shared_ptr<logging::Logger> logger = logging::LoggerConfiguration::getConfiguration().getLogger("controller");
    +
    +  // assumes POSIX compliant environment
    +  std::string minifiHome;
    +  if (const char *env_p = std::getenv(MINIFI_HOME_ENV_KEY)) {
    +    minifiHome = env_p;
    +    logger->log_info("Using MINIFI_HOME=%s from environment.", minifiHome);
    +  } else {
    +    logger->log_info("MINIFI_HOME is not set; determining based on environment.");
    +    char *path = nullptr;
    +    char full_path[PATH_MAX];
    +    path = realpath(argv[0], full_path);
    +
    +    if (path != nullptr) {
    +      std::string minifiHomePath(path);
    +      if (minifiHomePath.find_last_of("/\\") != std::string::npos) {
    +        minifiHomePath = minifiHomePath.substr(0, minifiHomePath.find_last_of("/\\"));  //Remove /minifi from path
    +        minifiHome = minifiHomePath.substr(0, minifiHomePath.find_last_of("/\\"));    //Remove /bin from path
    +      }
    +    }
    +
    +    // attempt to use cwd as MINIFI_HOME
    +    if (minifiHome.empty() || !validHome(minifiHome)) {
    +      char cwd[PATH_MAX];
    +      getcwd(cwd, PATH_MAX);
    +      minifiHome = cwd;
    +    }
    +
    +  }
    +
    +  if (!validHome(minifiHome)) {
    +    logger->log_error("No valid MINIFI_HOME could be inferred. "
    +                      "Please set MINIFI_HOME or run minifi from a valid location.");
    +    return -1;
    +  }
    +
    +  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
    +  configuration->setHome(minifiHome);
    +  configuration->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
    +
    +  std::shared_ptr<logging::LoggerProperties> log_properties = std::make_shared<logging::LoggerProperties>();
    +  log_properties->setHome(minifiHome);
    +  log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE);
    +  logging::LoggerConfiguration::getConfiguration().initialize(log_properties);
    +
    +  auto stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configuration);
    +
    +  std::string host, port, caCert;
    +
    +  if (!configuration->get("controller.socket.host", host) || !configuration->get("controller.socket.port", port)) {
    --- End diff --
    
    what do you think about allowing an override of the socket as another option(s).  similar to how the zookeeper allows specifying a target host?  could provide for some interesting uses like where ssh tunneling may be helpful.  not that this scenario precludes that case, but this gives the minificontroller binary some utility on its own.


---

[GitHub] nifi-minifi-cpp pull request #237: MINIFICPP-37: Create an executable to sup...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/237#discussion_r161239545
  
    --- Diff: controller/MiNiFiController.cpp ---
    @@ -0,0 +1,196 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include <fcntl.h>
    +#include <stdio.h>
    +#include <semaphore.h>
    +#include <signal.h>
    +#include <vector>
    +#include <queue>
    +#include <map>
    +#include <unistd.h>
    +#include <yaml-cpp/yaml.h>
    +#include <iostream>
    +#include "io/BaseStream.h"
    +
    +#include "core/Core.h"
    +
    +#include "core/FlowConfiguration.h"
    +#include "core/ConfigurationFactory.h"
    +#include "core/RepositoryFactory.h"
    +#include "FlowController.h"
    +#include "Main.h"
    +
    +#include "Controller.h"
    +#include "c2/ControllerSocketProtocol.h"
    +
    +#include "cxxopts.hpp"
    +
    +int main(int argc, char **argv) {
    +
    +  std::shared_ptr<logging::Logger> logger = logging::LoggerConfiguration::getConfiguration().getLogger("controller");
    +
    +  // assumes POSIX compliant environment
    +  std::string minifiHome;
    +  if (const char *env_p = std::getenv(MINIFI_HOME_ENV_KEY)) {
    +    minifiHome = env_p;
    +    logger->log_info("Using MINIFI_HOME=%s from environment.", minifiHome);
    +  } else {
    +    logger->log_info("MINIFI_HOME is not set; determining based on environment.");
    +    char *path = nullptr;
    +    char full_path[PATH_MAX];
    +    path = realpath(argv[0], full_path);
    +
    +    if (path != nullptr) {
    +      std::string minifiHomePath(path);
    +      if (minifiHomePath.find_last_of("/\\") != std::string::npos) {
    +        minifiHomePath = minifiHomePath.substr(0, minifiHomePath.find_last_of("/\\"));  //Remove /minifi from path
    +        minifiHome = minifiHomePath.substr(0, minifiHomePath.find_last_of("/\\"));    //Remove /bin from path
    +      }
    +    }
    +
    +    // attempt to use cwd as MINIFI_HOME
    +    if (minifiHome.empty() || !validHome(minifiHome)) {
    +      char cwd[PATH_MAX];
    +      getcwd(cwd, PATH_MAX);
    +      minifiHome = cwd;
    +    }
    +
    +  }
    +
    +  if (!validHome(minifiHome)) {
    +    logger->log_error("No valid MINIFI_HOME could be inferred. "
    +                      "Please set MINIFI_HOME or run minifi from a valid location.");
    +    return -1;
    +  }
    +
    +  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
    +  configuration->setHome(minifiHome);
    +  configuration->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
    +
    +  std::shared_ptr<logging::LoggerProperties> log_properties = std::make_shared<logging::LoggerProperties>();
    +  log_properties->setHome(minifiHome);
    +  log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE);
    +  logging::LoggerConfiguration::getConfiguration().initialize(log_properties);
    +
    +  auto stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configuration);
    +
    +  std::string host, port, caCert;
    +
    +  if (!configuration->get("controller.socket.host", host) || !configuration->get("controller.socket.port", port)) {
    +    std::cout << "MiNiFi Controller is disabled" << std::endl;
    +    exit(0);
    +  }
    +
    +  cxxopts::Options options("MiNiFiController", "MiNiFi local agent controller");
    +  options.positional_help("[optional args]").show_positional_help();
    +
    +  options.add_options()  //NOLINT
    +  ("help", "Shows Help")  //NOLINT
    +  ("stop", "Shuts down the provided component", cxxopts::value<std::vector<std::string>>())  //NOLINT
    +  ("start", "Starts provided component", cxxopts::value<std::vector<std::string>>())  //NOLINT
    +  ("l,list", "Provides a list of connections or processors", cxxopts::value<std::string>())  //NOLINT
    +  ("c,clear", "Clears the associated connection queue", cxxopts::value<std::vector<std::string>>())  //NOLINT
    +  ("getsize", "Reports the size of the associated connection queue", cxxopts::value<std::vector<std::string>>())  //NOLINT
    +  ("updateflow", "Updates the flow of the agent using the provided flow file", cxxopts::value<std::string>())  //NOLINT
    +  ("getfull", "Reports a list of full connections");
    +
    +  auto result = options.parse(argc, argv);
    --- End diff --
    
    would be nice to catch unsupported options and redirect to the help in lieu of the exception, e.g.
    ```
    $ ./bin/minificontroller -h
    [2018-01-infol 09:46:26.263] [controller] [info] MINIFI_HOME is not set; determining based on environment.
    [2018-01-infol 09:46:26.264] [org::apache::nifi::minifi::Properties] [info] Using configuration file located at /Users/apiri/Development/code/apache/nifi-minifi-cpp/build/nifi-minifi-cpp-0.4.0/conf/minifi.properties
    [2018-01-infol 09:46:26.264] [org::apache::nifi::minifi::Properties] [info] Using configuration file located at /Users/apiri/Development/code/apache/nifi-minifi-cpp/build/nifi-minifi-cpp-0.4.0/conf/minifi-log.properties
    libc++abi.dylib: terminating with uncaught exception of type cxxopts::option_not_exists_exception: Option ‘h’ does not exist
    [1]    56831 abort      ./bin/minificontroller -h
    ```


---

[GitHub] nifi-minifi-cpp pull request #237: MINIFICPP-37: Create an executable to sup...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/237#discussion_r161234180
  
    --- Diff: libminifi/src/c2/ControllerSocketProtocol.cpp ---
    @@ -0,0 +1,245 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +#include "c2/ControllerSocketProtocol.h"
    +#include "utils/StringUtils.h"
    +#include "io/DescriptorStream.h"
    +#include <utility>
    +#include <memory>
    +#include <vector>
    +#include <string>
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace c2 {
    +
    +void ControllerSocketProtocol::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
    +                                          const std::shared_ptr<Configure> &configuration) {
    +  HeartBeatReporter::initialize(controller, updateSink, configuration);
    +  stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configuration);
    +
    +  std::string host, port, caCert;
    +  if (nullptr != configuration_ && configuration_->get("controller.socket.host", host) && configuration_->get("controller.socket.port", port)) {
    +    server_socket_ = std::unique_ptr<io::ServerSocket>(new io::ServerSocket(nullptr, host, std::stoi(port), 2));
    +    server_socket_->initialize(true);
    +
    +    auto check = [this]() -> bool {
    +      return update_sink_->isRunning();
    +    };
    +
    +    auto handler = [this](int fd) {
    +      uint8_t head;
    +      io::DescriptorStream stream(fd);
    +      if (stream.read(head) != 1) {
    +        logger_->log_debug("Connection broke with fd %d", fd);
    +        return;
    +      }
    +      switch (head) {
    +        case Operation::START:
    +        {
    +          std::string componentStr;
    +          int size = stream.readUTF(componentStr);
    +          if ( size != -1 ) {
    +            auto components = update_sink_->getComponents(componentStr);
    +            for (auto component : components) {
    +              component->start();
    +            }
    +          } else {
    +            logger_->log_debug("Connection broke with fd %d", fd);
    +          }
    +        }
    +        break;
    +        case Operation::STOP:
    +        {
    +          std::string componentStr;
    +          int size = stream.readUTF(componentStr);
    +          if ( size != -1 ) {
    +            auto components = update_sink_->getComponents(componentStr);
    +            for (auto component : components) {
    +              component->stop(true, 1000);
    +            }
    +          } else {
    +            logger_->log_debug("Connection broke with fd %d", fd);
    +          }
    +        }
    +        break;
    +        case Operation::CLEAR:
    +        {
    +          std::string connection;
    +          int size = stream.readUTF(connection);
    +          if ( size != -1 ) {
    +            update_sink_->clearConnection(connection);
    +          }
    +        }
    +        break;
    +        case Operation::UPDATE:
    +        {
    +          std::string what;
    +          int size = stream.readUTF(what);
    +          if (size == -1) {
    +            logger_->log_debug("Connection broke with fd %d", fd);
    +            break;
    +          }
    +          if (what == "flow") {
    +            std::string ff_loc;
    +            int size = stream.readUTF(ff_loc);
    +            std::ifstream tf(ff_loc);
    +            std::string configuration((std::istreambuf_iterator<char>(tf)),
    +                std::istreambuf_iterator<char>());
    +            if (size == -1) {
    +              logger_->log_debug("Connection broke with fd %d", fd);
    +              break;
    +            }
    +            update_sink_->applyUpdate(configuration);
    +          }
    +        }
    +        break;
    +        case Operation::DESCRIBE:
    +        {
    +          std::string what;
    +          int size = stream.readUTF(what);
    +          if (size == -1) {
    +            logger_->log_debug("Connection broke with fd %d", fd);
    +            break;
    +          }
    +          if (what == "queue") {
    +            std::string connection;
    +            int size = stream.readUTF(connection);
    +            if (size == -1) {
    +              logger_->log_debug("Connection broke with fd %d", fd);
    +              break;
    +            }
    +            std::stringstream response;
    +            {
    +              std::lock_guard<std::mutex> lock(controller_mutex_);
    +              response << queue_size_[connection] << " / " << queue_max_[connection];
    +            }
    +            io::BaseStream resp;
    +            resp.writeData(&head, 1);
    +            resp.writeUTF(response.str());
    +            write(fd, resp.getBuffer(), resp.getSize());
    +          } else if (what == "processors") {
    --- End diff --
    
    minor:  processors is a little broad given that we are getting all components so this is bundling in ports as well.  does components make more sense?


---

[GitHub] nifi-minifi-cpp pull request #237: MINIFICPP-37: Create an executable to sup...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/237#discussion_r161238296
  
    --- Diff: controller/Controller.h ---
    @@ -0,0 +1,188 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef CONTROLLER_CONTROLLER_H_
    +#define CONTROLLER_CONTROLLER_H_
    +
    +#include "io/ClientSocket.h"
    +#include "c2/ControllerSocketProtocol.h"
    +
    +/**
    + * Sends a single argument comment
    + * @param socket socket unique ptr.
    + * @param op operation to perform
    + * @param value value to send
    + */
    +bool sendSingleCommand(std::unique_ptr<minifi::io::Socket> socket, uint8_t op, const std::string value) {
    +  socket->initialize();
    +  std::vector<uint8_t> data;
    +  minifi::io::BaseStream stream;
    +  stream.writeData(&op, 1);
    +  stream.writeUTF(value);
    +  socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize());
    +  return true;
    +}
    +
    +/**
    + * Stops a stopped component
    + * @param socket socket unique ptr.
    + * @param op operation to perform
    + */
    +bool stopComponent(std::unique_ptr<minifi::io::Socket> socket, std::string component) {
    +  return sendSingleCommand(std::move(socket), minifi::c2::Operation::STOP, component);
    +}
    +
    +/**
    + * Starts a previously stopped component.
    + * @param socket socket unique ptr.
    + * @param op operation to perform
    + */
    +bool startComponent(std::unique_ptr<minifi::io::Socket> socket, std::string component) {
    +  return sendSingleCommand(std::move(socket), minifi::c2::Operation::START, component);
    +}
    +
    +/**
    + * Clears a connection queue.
    + * @param socket socket unique ptr.
    + * @param op operation to perform
    + */
    +bool clearConnection(std::unique_ptr<minifi::io::Socket> socket, std::string connection) {
    +  return sendSingleCommand(std::move(socket), minifi::c2::Operation::CLEAR, connection);
    +}
    +
    +/**
    + * Updates the flow to the provided file
    + */
    +void updateFlow(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out, std::string file) {
    +  socket->initialize();
    +  std::vector<uint8_t> data;
    +  uint8_t op = minifi::c2::Operation::UPDATE;
    +  minifi::io::BaseStream stream;
    +  stream.writeData(&op, 1);
    +  stream.writeUTF("flow");
    +  stream.writeUTF(file);
    +  socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize());
    +
    +  // read the response
    +  uint8_t resp = 0;
    +  socket->readData(&resp, 1);
    +  if (resp == minifi::c2::Operation::DESCRIBE) {
    +    uint16_t connections = 0;
    +    socket->read(connections);
    +    out << connections << " are full" << std::endl;
    +    for (int i = 0; i < connections; i++) {
    +      std::string fullcomponent;
    +      socket->readUTF(fullcomponent);
    +      out << fullcomponent << " is full" << std::endl;
    +    }
    +  }
    +}
    +
    +/**
    + * Lists connections which are full
    + * @param socket socket ptr
    + */
    +void getFullConnections(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out) {
    +  socket->initialize();
    +  std::vector<uint8_t> data;
    +  uint8_t op = minifi::c2::Operation::DESCRIBE;
    +  minifi::io::BaseStream stream;
    +  stream.writeData(&op, 1);
    +  stream.writeUTF("getfull");
    +  socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize());
    +
    +  // read the response
    +  uint8_t resp = 0;
    +  socket->readData(&resp, 1);
    +  if (resp == minifi::c2::Operation::DESCRIBE) {
    +    uint16_t connections = 0;
    +    socket->read(connections);
    +    out << connections << " are full" << std::endl;
    +    for (int i = 0; i < connections; i++) {
    +      std::string fullcomponent;
    +      socket->readUTF(fullcomponent);
    +      out << fullcomponent << " is full" << std::endl;
    +    }
    +
    +  }
    +}
    +
    +/**
    + * Prints the connection size for the provided connection.
    + * @param socket socket ptr
    + * @param connection connection whose size will be returned.
    + */
    +void getConnectionSize(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out, std::string connection) {
    +  socket->initialize();
    +  std::vector<uint8_t> data;
    +  uint8_t op = minifi::c2::Operation::DESCRIBE;
    +  minifi::io::BaseStream stream;
    +  stream.writeData(&op, 1);
    +  stream.writeUTF("queue");
    +  stream.writeUTF(connection);
    +  socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize());
    +
    +  // read the response
    +  uint8_t resp = 0;
    +  socket->readData(&resp, 1);
    +  if (resp == minifi::c2::Operation::DESCRIBE) {
    +    std::string size;
    +    socket->readUTF(size);
    +    out << "Size/Max of " << connection << " " << size << std::endl;
    +  }
    +}
    +
    +void listProcessors(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out) {
    +  socket->initialize();
    +  minifi::io::BaseStream stream;
    +  uint8_t op = minifi::c2::Operation::DESCRIBE;
    +  stream.writeData(&op, 1);
    +  stream.writeUTF("processors");
    +  socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize());
    +  uint16_t responses = 0;
    +  socket->readData(&op, 1);
    +  socket->read(responses);
    +  out << "Processors:" << std::endl;
    --- End diff --
    
    Might be a nice add where I can specify a flag that strips out headers by default.  Would save an inline sed or similar  


---

[GitHub] nifi-minifi-cpp pull request #237: MINIFICPP-37: Create an executable to sup...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/237#discussion_r161255479
  
    --- Diff: controller/MiNiFiController.cpp ---
    @@ -0,0 +1,196 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include <fcntl.h>
    +#include <stdio.h>
    +#include <semaphore.h>
    +#include <signal.h>
    +#include <vector>
    +#include <queue>
    +#include <map>
    +#include <unistd.h>
    +#include <yaml-cpp/yaml.h>
    +#include <iostream>
    +#include "io/BaseStream.h"
    +
    +#include "core/Core.h"
    +
    +#include "core/FlowConfiguration.h"
    +#include "core/ConfigurationFactory.h"
    +#include "core/RepositoryFactory.h"
    +#include "FlowController.h"
    +#include "Main.h"
    +
    +#include "Controller.h"
    +#include "c2/ControllerSocketProtocol.h"
    +
    +#include "cxxopts.hpp"
    +
    +int main(int argc, char **argv) {
    +
    +  std::shared_ptr<logging::Logger> logger = logging::LoggerConfiguration::getConfiguration().getLogger("controller");
    +
    +  // assumes POSIX compliant environment
    +  std::string minifiHome;
    +  if (const char *env_p = std::getenv(MINIFI_HOME_ENV_KEY)) {
    +    minifiHome = env_p;
    +    logger->log_info("Using MINIFI_HOME=%s from environment.", minifiHome);
    +  } else {
    +    logger->log_info("MINIFI_HOME is not set; determining based on environment.");
    +    char *path = nullptr;
    +    char full_path[PATH_MAX];
    +    path = realpath(argv[0], full_path);
    +
    +    if (path != nullptr) {
    +      std::string minifiHomePath(path);
    +      if (minifiHomePath.find_last_of("/\\") != std::string::npos) {
    +        minifiHomePath = minifiHomePath.substr(0, minifiHomePath.find_last_of("/\\"));  //Remove /minifi from path
    +        minifiHome = minifiHomePath.substr(0, minifiHomePath.find_last_of("/\\"));    //Remove /bin from path
    +      }
    +    }
    +
    +    // attempt to use cwd as MINIFI_HOME
    +    if (minifiHome.empty() || !validHome(minifiHome)) {
    +      char cwd[PATH_MAX];
    +      getcwd(cwd, PATH_MAX);
    +      minifiHome = cwd;
    +    }
    +
    +  }
    +
    +  if (!validHome(minifiHome)) {
    +    logger->log_error("No valid MINIFI_HOME could be inferred. "
    +                      "Please set MINIFI_HOME or run minifi from a valid location.");
    +    return -1;
    +  }
    +
    +  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
    +  configuration->setHome(minifiHome);
    +  configuration->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
    +
    +  std::shared_ptr<logging::LoggerProperties> log_properties = std::make_shared<logging::LoggerProperties>();
    +  log_properties->setHome(minifiHome);
    +  log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE);
    +  logging::LoggerConfiguration::getConfiguration().initialize(log_properties);
    +
    +  auto stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configuration);
    +
    +  std::string host, port, caCert;
    +
    +  if (!configuration->get("controller.socket.host", host) || !configuration->get("controller.socket.port", port)) {
    --- End diff --
    
    This was one of the big things I wanted feedback on....primarily because this depends highly on the use case. The reason It is not an option now is because the socket is limited to the loopback adapter, thus making an option irrelevant. If we add make it configurable so that the deployment can use a socket bound to any interface on that host, we could support an option in minificontroller. I'll make this change making the default only work on local host and allowing it to be changed in the minifi.properties


---