You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2019/06/24 12:49:55 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-621 Add initial
tailfile nanofi example
This is an automated email from the ASF dual-hosted git repository.
phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new e7ac27d MINIFICPP-621 Add initial tailfile nanofi example
e7ac27d is described below
commit e7ac27d00eb160b28567cea2201deda2af10593a
Author: Murtuza Shareef <ms...@cloudera.com>
AuthorDate: Mon Jun 10 18:15:48 2019 -0400
MINIFICPP-621 Add initial tailfile nanofi example
This closes #590.
Signed-off-by: Marc Parisi <ph...@apache.org>
---
libminifi/include/ResourceClaim.h | 1 +
nanofi/examples/CMakeLists.txt | 4 +
nanofi/examples/tail_file.c | 257 +++++++++++++++++++++++++++++++++++++
nanofi/include/api/nanofi.h | 19 +++
nanofi/include/core/cstructs.h | 11 ++
nanofi/include/core/string_utils.h | 53 ++++++++
nanofi/src/api/nanofi.cpp | 37 ++++--
nanofi/src/core/string_utils.c | 86 +++++++++++++
8 files changed, 457 insertions(+), 11 deletions(-)
diff --git a/libminifi/include/ResourceClaim.h b/libminifi/include/ResourceClaim.h
index 8cae8fc..e4523e4 100644
--- a/libminifi/include/ResourceClaim.h
+++ b/libminifi/include/ResourceClaim.h
@@ -59,6 +59,7 @@ class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> {
explicit ResourceClaim(std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager);
explicit ResourceClaim(const std::string path, std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, bool deleted = false);
+
// Destructor
~ResourceClaim() {
}
diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt
index 6a9779c..4301a7d 100644
--- a/nanofi/examples/CMakeLists.txt
+++ b/nanofi/examples/CMakeLists.txt
@@ -83,4 +83,8 @@ add_executable(monitor_directory monitor_directory.c)
target_link_libraries(monitor_directory nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+add_executable(tail_file tail_file.c)
+
+target_link_libraries(tail_file nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
endif()
diff --git a/nanofi/examples/tail_file.c b/nanofi/examples/tail_file.c
new file mode 100644
index 0000000..2200df4
--- /dev/null
+++ b/nanofi/examples/tail_file.c
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+typedef struct flow_file_records {
+ flow_file_record ** records;
+ uint64_t len;
+} flow_file_records;
+
+struct flow_file_records * flowfiles = NULL;
+nifi_instance * instance = NULL;
+standalone_processor * proc = NULL;
+int file_offset = 0;
+int stopped = 0;
+
+void signal_handler(int signum) {
+ if (signum == SIGINT || signum == SIGTERM) {
+ stopped = 1;
+ }
+}
+
+void transmit_flow_files(nifi_instance * instance) {
+ NULL_CHECK( ,flowfiles);
+ int i;
+ for (i = 0; i < flowfiles->len; ++i) {
+ NULL_CHECK( ,flowfiles->records[i]);
+ transmit_flowfile(flowfiles->records[i], instance);
+ }
+}
+
+void free_flow_file_records() {
+ NULL_CHECK( ,flowfiles);
+ int i;
+ for (i = 0; i < flowfiles->len; ++i) {
+ free_flowfile(flowfiles->records[i]);
+ }
+ free(flowfiles);
+ flowfiles = NULL;
+}
+
+void set_offset(int offset) {
+ file_offset = offset;
+}
+
+int get_offset() {
+ return file_offset;
+}
+
+void free_all_strings(char ** strings, int num_strings) {
+ int i;
+ for (i = 0; i < num_strings; ++i) {
+ free(strings[i]);
+ }
+}
+
+void on_trigger_callback(processor_session * ps, processor_context * ctx) {
+
+ char file_path[4096];
+ char delimiter[2];
+
+ if (get_property(ctx, "file_path", file_path, 50) != 0) {
+ return;
+ }
+
+ if (get_property(ctx, "delimiter", delimiter, 2) != 0) {
+ return;
+ }
+
+ if (strlen(delimiter) == 0) {
+ printf("Delimiter not specified or it is empty\n");
+ return;
+ }
+ char delim = '\0';
+ if (strlen(delimiter) > 0) {
+ delim = delimiter[0];
+ }
+
+ if (delim == '\0') {
+ printf("Invalid delimiter \n");
+ return;
+ }
+
+ if (delim == '\\') {
+ if (strlen(delimiter) > 1) {
+ switch (delimiter[1]) {
+ case 'r':
+ delim = '\r';
+ break;
+ case 't':
+ delim = '\t';
+ break;
+ case 'n':
+ delim = '\n';
+ break;
+ case '\\':
+ delim = '\\';
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ int curr_offset = get_offset();
+ int max_bytes_read = 4096;
+ char buff[max_bytes_read + 1];
+ memset(buff,'\0', max_bytes_read);
+ FILE * fp = fopen(file_path, "rb");
+ if (!fp) return;
+ fseek(fp, curr_offset, SEEK_SET);
+
+ int bytes_read = 0;
+ while ((bytes_read = fread(buff, 1, max_bytes_read, fp)) > 0) {
+ buff[bytes_read] = '\0';
+ tokenizer_mode_t mode = TAILFILE_MODE;
+ struct tokens tks = tokenize_string(buff, delim, mode);
+
+ if (tks.num_strings == 0) return;
+
+ set_offset(get_offset() + tks.total_bytes);
+
+ flowfiles = (flow_file_records *)malloc(sizeof(flow_file_records));
+ flowfiles->records = malloc(sizeof(flow_file_record *) * tks.num_strings);
+ flowfiles->len = tks.num_strings;
+
+ int i;
+ for (i = 0; i < tks.num_strings; ++i) {
+ flowfiles->records[i] = NULL;
+ }
+
+ for (i = 0; i < tks.num_strings; ++i) {
+ if (tks.str_list[i] && strlen(tks.str_list[i]) > 0) {
+ flow_file_record * ffr = generate_flow_file(instance, proc);
+ const char * flow_file_path = ffr->contentLocation;
+ FILE * ffp = fopen(flow_file_path, "wb");
+ if (!ffp) {
+ printf("Cannot open flow file at path %s to write content to.\n", flow_file_path);
+ fclose(fp);
+ free_tokens(&tks);
+ return;
+ }
+ int count = strlen(tks.str_list[i]);
+ int ret = fwrite(tks.str_list[i], 1, count, ffp);
+ if (ret < count) {
+ fclose(ffp);
+ return;
+ }
+ fseek(ffp, 0, SEEK_END);
+ ffr->size = ftell(ffp);
+ fclose(ffp);
+ flowfiles->records[i] = ffr;
+ }
+ }
+ free_tokens(&tks);
+ }
+ fclose(fp);
+}
+
+int main(int argc, char** argv) {
+
+ if (argc < 6) {
+ printf("Error: must run ./tail_file <file> <interval> <delimiter> <nifi instance url> <remote port>\n");
+ exit(1);
+ }
+
+ char * file = argv[1];
+ char * interval = argv[2];
+ char * delimiter = argv[3];
+ char * instance_str = argv[4];
+ char * port_str = argv[5];
+
+ if (access(file, F_OK) == -1) {
+ printf("Error: %s doesn't exist!\n", file);
+ exit(1);
+ }
+
+ struct stat stats;
+ int ret = stat(file, &stats);
+
+ errno = 0;
+ if (ret == -1) {
+ printf("Error occurred while getting file status {file: %s, error: %s}\n", file, strerror(errno));
+ exit(1);
+ }
+ // Check for file existence
+ if (S_ISDIR(stats.st_mode)){
+ printf("Error: %s is a directory!\n", file);
+ exit(1);
+ }
+
+ errno = 0;
+ unsigned long intrvl = strtol(interval, NULL, 10);
+
+ if (errno == ERANGE || intrvl == LONG_MAX || intrvl == LONG_MIN) {
+ printf("Invalid interval value specified\n");
+ return 0;
+ }
+
+ struct sigaction action;
+ memset(&action, 0, sizeof(sigaction));
+ action.sa_handler = signal_handler;
+ sigaction(SIGTERM, &action, NULL);
+ sigaction(SIGINT, &action, NULL);
+
+ nifi_port port;
+
+ port.port_id = port_str;
+
+ instance = create_instance(instance_str, &port);
+
+ const char * processor_name = "TailFile";
+
+ add_custom_processor(processor_name, on_trigger_callback);
+
+ proc = create_processor(processor_name);
+
+ set_standalone_property(proc, "file_path", file);
+ set_standalone_property(proc, "delimiter", delimiter);
+
+ set_offset(0);
+ while (!stopped) {
+ flow_file_record * new_ff = invoke(proc);
+ transmit_flow_files(instance);
+ free_flow_file_records();
+ free_flowfile(new_ff);
+ sleep(intrvl);
+ }
+
+ free_standalone_processor(proc);
+ free(instance);
+
+ return 0;
+}
diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h
index 04e5712..6180952 100644
--- a/nanofi/include/api/nanofi.h
+++ b/nanofi/include/api/nanofi.h
@@ -40,6 +40,17 @@ extern "C" {
#define SUCCESS_RELATIONSHIP "success"
#define FAILURE_RELATIONSHIP "failure"
+#define NULL_CHECK(ret_val, ...) \
+ do { \
+ const void *_p[] = { __VA_ARGS__ }; \
+ int _i; \
+ for (_i = 0; _i < sizeof(_p)/sizeof(*_p); _i++) { \
+ if (_p[_i] == NULL) { \
+ return ret_val; \
+ } \
+ } \
+ } while(0)
+
/**
* Enables logging (disabled by default)
**/
@@ -299,6 +310,14 @@ flow_file_record* create_ff_object_na(const char *file, const size_t len, const
flow_file_record* create_ff_object_nc();
/**
+ * Adds content to the flow file record.
+ * @param instance the nifi instance
+ * @param proc the standalone processor
+ * @return a flow file record
+ */
+flow_file_record* generate_flow_file(nifi_instance * instance, standalone_processor * proc);
+
+/**
* Get incoming flow file. To be used in processor logic callbacks.
* @param session current processor session
* @param context current processor context
diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h
index d775747..9ad2783 100644
--- a/nanofi/include/core/cstructs.h
+++ b/nanofi/include/core/cstructs.h
@@ -149,4 +149,15 @@ typedef struct cstream {
SOCKET socket_;
} cstream;
+/****
+ * ##################################################################
+ * STRING OPERATIONS
+ * ##################################################################
+ */
+
+typedef struct tokens {
+ char ** str_list;
+ uint64_t num_strings;
+ uint64_t total_bytes;
+} tokens;
#endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */
diff --git a/nanofi/include/core/string_utils.h b/nanofi/include/core/string_utils.h
new file mode 100644
index 0000000..62aac99
--- /dev/null
+++ b/nanofi/include/core/string_utils.h
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef NIFI_MINIFI_CPP_STRING_UTILS_H
+#define NIFI_MINIFI_CPP_STRING_UTILS_H
+
+#include "cstructs.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef enum TOKENIZER_MODE {
+ TAILFILE_MODE = 0, /* Do not include a non delimiting string */
+ DEFAULT_MODE /* include a non delimiting string */
+} tokenizer_mode_t;
+
+/**
+ * Tokenizes a delimited string and returns a list of tokens
+ * @param str the string to be tokenized
+ * @param delim the delimiting character
+ * @param tokenizer_mode_t the enumeration value specified to include/exclude a non delimiting string in the result
+ * @return a list of strings wrapped inside tokens struct
+ */
+tokens tokenize_string(const char * str, char delim, tokenizer_mode_t);
+
+/**
+ * Free the dynamically allocated tokens
+ * @param tks the tokens to be freed
+ */
+void free_tokens(tokens * tks);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //NIFI_MINIFI_CPP_STRING_UTILS_H
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index 6b9fa8e..d37fc6b 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -34,17 +34,6 @@
#include "io/DataStream.h"
#include "core/cxxstructs.h"
-#define NULL_CHECK(ret_val, ...) \
- do { \
- const void *_p[] = { __VA_ARGS__ }; \
- int _i; \
- for (_i = 0; _i < sizeof(_p)/sizeof(*_p); _i++) { \
- if (_p[_i] == NULL) { \
- return ret_val; \
- } \
- } \
- } while(0)
-
using string_map = std::map<std::string, std::string>;
class API_INITIALIZER {
@@ -255,6 +244,32 @@ flow_file_record* create_ff_object_nc() {
new_ff->attributes = new string_map();
return new_ff;
}
+
+flow_file_record * generate_flow_file(nifi_instance * instance, standalone_processor * proc) {
+ if (!instance || !proc) {
+ return nullptr;
+ }
+ flow_file_record * ffr = create_ff_object_nc();
+
+ auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
+ auto content_repo = minifi_instance_ref->getContentRepository();
+
+ ffr->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>(content_repo));
+ auto plan = ExecutionPlan::getPlan(proc->getUUIDStr());
+ if (!plan) {
+ return nullptr;
+ }
+ ffr->ffp = static_cast<void*>(new std::shared_ptr<core::FlowFile>(plan->getCurrentFlowFile()));
+ ffr->keepContent = 1;
+ auto ff_content_repo_ptr = (static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp));
+ auto claim = std::make_shared<minifi::ResourceClaim>(*ff_content_repo_ptr);
+ const char * full_path = claim->getContentFullPath().c_str();
+ int len = strlen(full_path);
+ ffr->contentLocation = (char *) malloc(sizeof(char) * (len + 1));
+ snprintf(ffr->contentLocation, len + 1, "%s", full_path);
+ return ffr;
+}
+
/**
* Reclaims memory associated with a flow file object
* @param ff flow file record.
diff --git a/nanofi/src/core/string_utils.c b/nanofi/src/core/string_utils.c
new file mode 100644
index 0000000..517da1f
--- /dev/null
+++ b/nanofi/src/core/string_utils.c
@@ -0,0 +1,86 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/cstructs.h"
+#include "core/string_utils.h"
+#include <string.h>
+#include <stdlib.h>
+
+tokens tokenize_string(const char * str, char delim, tokenizer_mode_t mode) {
+ tokens tks;
+ tks.num_strings = 0;
+ tks.total_bytes = 0;
+
+ if (!str) return tks;
+
+ char * begin = (char *)str;
+ char * end = NULL;
+ int num_strings = 0;
+ while ((end = strchr(begin, delim))) {
+ if (begin == end) {
+ begin++;
+ continue;
+ }
+ begin = (end+1);
+ num_strings++;
+ }
+
+ if (mode == DEFAULT_MODE && (*begin != '\0')) {
+ num_strings++;
+ }
+
+ tks.str_list = calloc(num_strings, sizeof(char *));
+ tks.num_strings = 0;
+ tks.total_bytes = 0;
+
+ begin = (char *)str;
+ end = NULL;
+ while ((end = strchr(begin, delim))) {
+ if (begin == end) {
+ begin++;
+ tks.total_bytes++;
+ continue;
+ }
+ int len = end - begin;
+ char * substr = (char *)malloc((len+1) * sizeof(char));
+ strncpy(substr, begin, len);
+ substr[len] = '\0';
+ tks.str_list[tks.num_strings++] = substr;
+ tks.total_bytes += (len+1);
+ begin = (end+1);
+ }
+
+ if (mode == DEFAULT_MODE && (*begin != '\0')) {
+ int len = strlen(begin);
+ char * substr = (char *)malloc((len+1) * sizeof(char));
+ strncpy(substr, begin, len);
+ substr[len] = '\0';
+ tks.str_list[tks.num_strings++] = substr;
+ tks.total_bytes += (len+1);
+ }
+ return tks;
+}
+
+void free_tokens(tokens * tks) {
+ if (tks) {
+ int i;
+ for (i = 0; i < tks->num_strings; ++i) {
+ free(tks->str_list[i]);
+ }
+ }
+}