You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2019/06/24 12:49:55 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-621 Add initial tailfile nanofi example

This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new e7ac27d  MINIFICPP-621 Add initial tailfile nanofi example
e7ac27d is described below

commit e7ac27d00eb160b28567cea2201deda2af10593a
Author: Murtuza Shareef <ms...@cloudera.com>
AuthorDate: Mon Jun 10 18:15:48 2019 -0400

    MINIFICPP-621 Add initial tailfile nanofi example
    
    This closes #590.
    
    Signed-off-by: Marc Parisi <ph...@apache.org>
---
 libminifi/include/ResourceClaim.h  |   1 +
 nanofi/examples/CMakeLists.txt     |   4 +
 nanofi/examples/tail_file.c        | 257 +++++++++++++++++++++++++++++++++++++
 nanofi/include/api/nanofi.h        |  19 +++
 nanofi/include/core/cstructs.h     |  11 ++
 nanofi/include/core/string_utils.h |  53 ++++++++
 nanofi/src/api/nanofi.cpp          |  37 ++++--
 nanofi/src/core/string_utils.c     |  86 +++++++++++++
 8 files changed, 457 insertions(+), 11 deletions(-)

diff --git a/libminifi/include/ResourceClaim.h b/libminifi/include/ResourceClaim.h
index 8cae8fc..e4523e4 100644
--- a/libminifi/include/ResourceClaim.h
+++ b/libminifi/include/ResourceClaim.h
@@ -59,6 +59,7 @@ class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> {
   explicit ResourceClaim(std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager);
 
   explicit ResourceClaim(const std::string path, std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, bool deleted = false);
+
   // Destructor
   ~ResourceClaim() {
   }
diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt
index 6a9779c..4301a7d 100644
--- a/nanofi/examples/CMakeLists.txt
+++ b/nanofi/examples/CMakeLists.txt
@@ -83,4 +83,8 @@ add_executable(monitor_directory monitor_directory.c)
 
 target_link_libraries(monitor_directory nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
 
+add_executable(tail_file tail_file.c)
+
+target_link_libraries(tail_file nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
 endif()
diff --git a/nanofi/examples/tail_file.c b/nanofi/examples/tail_file.c
new file mode 100644
index 0000000..2200df4
--- /dev/null
+++ b/nanofi/examples/tail_file.c
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+    *
+    *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+typedef struct flow_file_records {
+    flow_file_record ** records;
+    uint64_t len;
+} flow_file_records;
+
+struct flow_file_records * flowfiles = NULL;
+nifi_instance * instance = NULL;
+standalone_processor * proc = NULL;
+int file_offset = 0;
+int stopped = 0;
+
+void signal_handler(int signum) {
+    if (signum == SIGINT || signum == SIGTERM) {
+        stopped = 1;
+    }
+}
+
+void transmit_flow_files(nifi_instance * instance) {
+    NULL_CHECK( ,flowfiles);
+    int i;
+    for (i = 0; i < flowfiles->len; ++i) {
+        NULL_CHECK( ,flowfiles->records[i]);
+        transmit_flowfile(flowfiles->records[i], instance);
+    }
+}
+
+void free_flow_file_records() {
+    NULL_CHECK( ,flowfiles);
+    int i;
+    for (i = 0; i < flowfiles->len; ++i) {
+        free_flowfile(flowfiles->records[i]);
+    }
+    free(flowfiles);
+    flowfiles = NULL;
+}
+
+void set_offset(int offset) {
+    file_offset = offset;
+}
+
+int get_offset() {
+    return file_offset;
+}
+
+void free_all_strings(char ** strings, int num_strings) {
+    int i;
+    for (i = 0; i < num_strings; ++i) {
+        free(strings[i]);
+    }
+}
+
+void on_trigger_callback(processor_session * ps, processor_context * ctx) {
+
+    char file_path[4096];
+    char delimiter[2];
+
+    if (get_property(ctx, "file_path", file_path, 50) != 0) {
+        return;
+    }
+
+    if (get_property(ctx, "delimiter", delimiter, 2) != 0) {
+        return;
+    }
+
+    if (strlen(delimiter) == 0) {
+        printf("Delimiter not specified or it is empty\n");
+        return;
+    }
+    char delim = '\0';
+    if (strlen(delimiter) > 0) {
+        delim = delimiter[0];
+    }
+
+    if (delim == '\0') {
+        printf("Invalid delimiter \n");
+        return;
+    }
+
+    if (delim == '\\') {
+          if (strlen(delimiter) > 1) {
+            switch (delimiter[1]) {
+              case 'r':
+                delim = '\r';
+                break;
+              case 't':
+                delim = '\t';
+                break;
+              case 'n':
+                delim = '\n';
+                break;
+              case '\\':
+                delim = '\\';
+                break;
+              default:
+                break;
+            }
+        }
+    }
+
+    int curr_offset = get_offset();
+    int max_bytes_read = 4096;
+    char buff[max_bytes_read + 1];
+    memset(buff,'\0', max_bytes_read);
+    FILE * fp = fopen(file_path, "rb");
+    if (!fp) return;
+    fseek(fp, curr_offset, SEEK_SET);
+
+    int bytes_read = 0;
+    while ((bytes_read = fread(buff, 1, max_bytes_read, fp)) > 0) {
+        buff[bytes_read] = '\0';
+        tokenizer_mode_t mode = TAILFILE_MODE;
+        struct tokens tks = tokenize_string(buff, delim, mode);
+
+        if (tks.num_strings == 0) return;
+
+        set_offset(get_offset() + tks.total_bytes);
+
+        flowfiles = (flow_file_records *)malloc(sizeof(flow_file_records));
+        flowfiles->records = malloc(sizeof(flow_file_record *) * tks.num_strings);
+        flowfiles->len = tks.num_strings;
+
+        int i;
+        for (i = 0; i < tks.num_strings; ++i) {
+            flowfiles->records[i] = NULL;
+        }
+
+        for (i = 0; i < tks.num_strings; ++i) {
+            if (tks.str_list[i] && strlen(tks.str_list[i]) > 0) {
+                flow_file_record * ffr = generate_flow_file(instance, proc);
+                const char * flow_file_path = ffr->contentLocation;
+                FILE * ffp = fopen(flow_file_path, "wb");
+                if (!ffp) {
+                    printf("Cannot open flow file at path %s to write content to.\n", flow_file_path);
+                    fclose(fp);
+                    free_tokens(&tks);
+                    return;
+                }
+                int count = strlen(tks.str_list[i]);
+                int ret = fwrite(tks.str_list[i], 1, count, ffp);
+                if (ret < count) {
+                    fclose(ffp);
+                    return;
+                }
+                fseek(ffp, 0, SEEK_END);
+                ffr->size = ftell(ffp);
+                fclose(ffp);
+                flowfiles->records[i] = ffr;
+            }
+        }
+        free_tokens(&tks);
+    }
+    fclose(fp);
+}
+
+int main(int argc, char** argv) {
+
+    if (argc < 6) {
+        printf("Error: must run ./tail_file <file> <interval> <delimiter> <nifi instance url> <remote port>\n");
+        exit(1);
+    }
+
+    char * file = argv[1];
+    char * interval = argv[2];
+    char * delimiter = argv[3];
+    char * instance_str = argv[4];
+    char * port_str = argv[5];
+
+    if (access(file, F_OK) == -1) {
+        printf("Error: %s doesn't exist!\n", file);
+        exit(1);
+    }
+
+    struct stat stats;
+    int ret = stat(file, &stats);
+
+    errno = 0;
+    if (ret == -1) {
+        printf("Error occurred while getting file status {file: %s, error: %s}\n", file, strerror(errno));
+        exit(1);
+    }
+    // Check for file existence
+    if (S_ISDIR(stats.st_mode)){
+        printf("Error: %s is a directory!\n", file);
+        exit(1);
+    }
+
+    errno = 0;
+    unsigned long intrvl = strtol(interval, NULL, 10);
+
+    if (errno == ERANGE || intrvl == LONG_MAX || intrvl == LONG_MIN) {
+        printf("Invalid interval value specified\n");
+        return 0;
+    }
+
+    struct sigaction action;
+    memset(&action, 0, sizeof(sigaction));
+    action.sa_handler = signal_handler;
+    sigaction(SIGTERM, &action, NULL);
+    sigaction(SIGINT, &action, NULL);
+
+    nifi_port port;
+
+    port.port_id = port_str;
+
+    instance = create_instance(instance_str, &port);
+
+    const char * processor_name = "TailFile";
+
+    add_custom_processor(processor_name, on_trigger_callback);
+
+    proc = create_processor(processor_name);
+
+    set_standalone_property(proc, "file_path", file);
+    set_standalone_property(proc, "delimiter", delimiter);
+
+    set_offset(0);
+    while (!stopped) {
+        flow_file_record * new_ff = invoke(proc);
+        transmit_flow_files(instance);
+        free_flow_file_records();
+        free_flowfile(new_ff);
+        sleep(intrvl);
+    }
+
+    free_standalone_processor(proc);
+    free(instance);
+
+    return 0;
+}
diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h
index 04e5712..6180952 100644
--- a/nanofi/include/api/nanofi.h
+++ b/nanofi/include/api/nanofi.h
@@ -40,6 +40,17 @@ extern "C" {
 #define SUCCESS_RELATIONSHIP "success"
 #define FAILURE_RELATIONSHIP "failure"
 
+#define NULL_CHECK(ret_val, ...)                        \
+  do {                                                  \
+    const void *_p[] = { __VA_ARGS__ };                 \
+    int _i;                                             \
+    for (_i = 0; _i < sizeof(_p)/sizeof(*_p); _i++) {   \
+      if (_p[_i] == NULL) {                             \
+        return ret_val;                                 \
+      }                                                 \
+    }                                                   \
+  } while(0)
+
 /**
  * Enables logging (disabled by default)
  **/
@@ -299,6 +310,14 @@ flow_file_record* create_ff_object_na(const char *file, const size_t len, const
 flow_file_record* create_ff_object_nc();
 
 /**
+ * Adds content to the flow file record.
+ * @param instance the nifi instance
+ * @param proc the standalone processor
+ * @return a flow file record
+ */
+flow_file_record* generate_flow_file(nifi_instance * instance, standalone_processor * proc);
+
+/**
  * Get incoming flow file. To be used in processor logic callbacks.
  * @param session current processor session
  * @param context current processor context
diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h
index d775747..9ad2783 100644
--- a/nanofi/include/core/cstructs.h
+++ b/nanofi/include/core/cstructs.h
@@ -149,4 +149,15 @@ typedef struct cstream {
   SOCKET socket_;
 } cstream;
 
+/****
+ * ##################################################################
+ *  STRING OPERATIONS
+ * ##################################################################
+ */
+
+typedef struct tokens {
+    char ** str_list;
+    uint64_t num_strings;
+    uint64_t total_bytes;
+} tokens;
 #endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */
diff --git a/nanofi/include/core/string_utils.h b/nanofi/include/core/string_utils.h
new file mode 100644
index 0000000..62aac99
--- /dev/null
+++ b/nanofi/include/core/string_utils.h
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef NIFI_MINIFI_CPP_STRING_UTILS_H
+#define NIFI_MINIFI_CPP_STRING_UTILS_H
+
+#include "cstructs.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef enum TOKENIZER_MODE {
+    TAILFILE_MODE = 0, /* Do not include a non delimiting string */
+    DEFAULT_MODE /* include a non delimiting string */
+} tokenizer_mode_t;
+
+/**
+ * Tokenizes a delimited string and returns a list of tokens
+ * @param str the string to be tokenized
+ * @param delim the delimiting character
+ * @param tokenizer_mode_t the enumeration value specified to include/exclude a non delimiting string in the result
+ * @return a list of strings wrapped inside tokens struct
+ */
+tokens tokenize_string(const char * str, char delim, tokenizer_mode_t);
+
+/**
+ * Free the dynamically allocated tokens
+ * @param tks the tokens to be freed
+ */
+void free_tokens(tokens * tks);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //NIFI_MINIFI_CPP_STRING_UTILS_H
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index 6b9fa8e..d37fc6b 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -34,17 +34,6 @@
 #include "io/DataStream.h"
 #include "core/cxxstructs.h"
 
-#define NULL_CHECK(ret_val, ...)                        \
-  do {                                                  \
-    const void *_p[] = { __VA_ARGS__ };                 \
-    int _i;                                             \
-    for (_i = 0; _i < sizeof(_p)/sizeof(*_p); _i++) {   \
-      if (_p[_i] == NULL) {                             \
-        return ret_val;                                 \
-      }                                                 \
-    }                                                   \
-  } while(0)
-
 using string_map = std::map<std::string, std::string>;
 
 class API_INITIALIZER {
@@ -255,6 +244,32 @@ flow_file_record* create_ff_object_nc() {
   new_ff->attributes = new string_map();
   return new_ff;
 }
+
+flow_file_record * generate_flow_file(nifi_instance * instance, standalone_processor * proc) {
+    if (!instance || !proc) {
+        return nullptr;
+    }
+    flow_file_record * ffr = create_ff_object_nc();
+
+    auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+    auto content_repo = minifi_instance_ref->getContentRepository();
+
+    ffr->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>(content_repo));
+    auto plan = ExecutionPlan::getPlan(proc->getUUIDStr());
+    if (!plan) {
+        return nullptr;
+    }
+    ffr->ffp = static_cast<void*>(new std::shared_ptr<core::FlowFile>(plan->getCurrentFlowFile()));
+    ffr->keepContent = 1;
+    auto ff_content_repo_ptr = (static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp));
+    auto claim = std::make_shared<minifi::ResourceClaim>(*ff_content_repo_ptr);
+    const char * full_path = claim->getContentFullPath().c_str();
+    int len = strlen(full_path);
+    ffr->contentLocation = (char *) malloc(sizeof(char) * (len + 1));
+    snprintf(ffr->contentLocation, len + 1, "%s", full_path);
+    return ffr;
+}
+
 /**
  * Reclaims memory associated with a flow file object
  * @param ff flow file record.
diff --git a/nanofi/src/core/string_utils.c b/nanofi/src/core/string_utils.c
new file mode 100644
index 0000000..517da1f
--- /dev/null
+++ b/nanofi/src/core/string_utils.c
@@ -0,0 +1,86 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/cstructs.h"
+#include "core/string_utils.h"
+#include <string.h>
+#include <stdlib.h>
+
+tokens tokenize_string(const char * str, char delim, tokenizer_mode_t mode) {
+    tokens tks;
+    tks.num_strings = 0;
+    tks.total_bytes = 0;
+
+    if (!str) return tks;
+
+    char * begin = (char *)str;
+    char * end = NULL;
+    int num_strings = 0;
+    while ((end = strchr(begin, delim))) {
+        if (begin == end) {
+            begin++;
+            continue;
+        }
+        begin = (end+1);
+        num_strings++;
+    }
+
+    if (mode == DEFAULT_MODE && (*begin != '\0')) {
+        num_strings++;
+    }
+
+    tks.str_list = calloc(num_strings, sizeof(char *));
+    tks.num_strings = 0;
+    tks.total_bytes = 0;
+
+    begin = (char *)str;
+    end = NULL;
+    while ((end = strchr(begin, delim))) {
+        if (begin == end) {
+            begin++;
+            tks.total_bytes++;
+            continue;
+        }
+        int len = end - begin;
+        char * substr = (char *)malloc((len+1) * sizeof(char));
+        strncpy(substr, begin, len);
+        substr[len] = '\0';
+        tks.str_list[tks.num_strings++] = substr;
+        tks.total_bytes += (len+1);
+        begin = (end+1);
+    }
+
+    if (mode == DEFAULT_MODE && (*begin != '\0')) {
+        int len = strlen(begin);
+        char * substr = (char *)malloc((len+1) * sizeof(char));
+        strncpy(substr, begin, len);
+        substr[len] = '\0';
+        tks.str_list[tks.num_strings++] = substr;
+        tks.total_bytes += (len+1);
+    }
+    return tks;
+}
+
+void free_tokens(tokens * tks) {
+    if (tks) {
+        int i;
+        for (i = 0; i < tks->num_strings; ++i) {
+            free(tks->str_list[i]);
+        }
+    }
+}