You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/03/09 15:15:45 UTC
nifi-minifi-cpp git commit: MINIFI-231: Add Flow Persistent,
Using id instead of name to load the flow from YAML
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 60f02126e -> c4fea0cd9
MINIFI-231: Add Flow Persistent, Using id instead of name to load the flow from YAML
This closes #62.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/c4fea0cd
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/c4fea0cd
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/c4fea0cd
Branch: refs/heads/master
Commit: c4fea0cd94f9ea2ae94303ff5b574c0c00d6745a
Parents: 60f0212
Author: Bin Qiu <be...@gmail.com>
Authored: Wed Mar 1 20:45:16 2017 -0800
Committer: Aldrin Piri <al...@apache.org>
Committed: Thu Mar 9 10:12:39 2017 -0500
----------------------------------------------------------------------
README.md | 8 +-
conf/minifi.properties | 5 +
libminifi/include/Configure.h | 7 +-
libminifi/include/Connection.h | 7 +-
libminifi/include/FlowController.h | 14 +-
libminifi/include/FlowFileRecord.h | 18 +-
libminifi/include/FlowFileRepository.h | 204 ++++++++++++++++
libminifi/include/ProcessGroup.h | 2 +
libminifi/include/ProcessSession.h | 7 +-
libminifi/include/Provenance.h | 164 +------------
libminifi/include/Repository.h | 294 ++++++++++++++++++++++++
libminifi/include/ResourceClaim.h | 5 +
libminifi/src/Configure.cpp | 7 +-
libminifi/src/Connection.cpp | 42 ++++
libminifi/src/FlowController.cpp | 89 ++++---
libminifi/src/FlowFileRecord.cpp | 53 ++++-
libminifi/src/FlowFileRepository.cpp | 280 ++++++++++++++++++++++
libminifi/src/ProcessGroup.cpp | 12 +
libminifi/src/ProcessSession.cpp | 48 +++-
libminifi/src/Provenance.cpp | 72 +-----
libminifi/src/Repository.cpp | 138 +++++++++++
libminifi/src/ResourceClaim.cpp | 2 +-
libminifi/src/io/ClientSocket.cpp | 2 +-
libminifi/test/unit/ProcessorTests.cpp | 44 +++-
libminifi/test/unit/ProvenanceTestHelper.h | 61 ++++-
25 files changed, 1281 insertions(+), 304 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 64f2377..b254b32 100644
--- a/README.md
+++ b/README.md
@@ -224,10 +224,12 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc
Flow Controller:
+ id: 471deef6-2a6e-4a7d-912a-81cc17e3a205
name: MiNiFi Flow
Processors:
- name: GetFile
+ id: 471deef6-2a6e-4a7d-912a-81cc17e3a206
class: org.apache.nifi.processors.standard.GetFile
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
@@ -242,15 +244,17 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc
Connections:
- name: TransferFilesToRPG
- source name: GetFile
+ id: 471deef6-2a6e-4a7d-912a-81cc17e3a207
+ source id: 471deef6-2a6e-4a7d-912a-81cc17e3a206
source relationship name: success
- destination name: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+ destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
max work queue size: 0
max work queue data size: 1 MB
flowfile expiration: 60 sec
Remote Processing Groups:
- name: NiFi Flow
+ id: 471deef6-2a6e-4a7d-912a-81cc17e3a208
url: http://localhost:8080/nifi
timeout: 30 secs
yield period: 10 sec
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/conf/minifi.properties
----------------------------------------------------------------------
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 62114dc..cfa2858 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -23,6 +23,11 @@ nifi.bored.yield.duration=10 millis
nifi.provenance.repository.directory.default=./provenance_repository
nifi.provenance.repository.max.storage.time=1 MIN
nifi.provenance.repository.max.storage.size=1 MB
+# FlowFileRepository #
+nifi.flowfile.repository.enable=true
+nifi.flowfile.repository.directory.default=./flowfile_repository
+nifi.flowfile.repository.max.storage.time=10 MIN
+nifi.flowfile.repository.max.storage.size=1 MB
# Security Related Properties #
# Enable tls ssl
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Configure.h b/libminifi/include/Configure.h
index 9bb7a0e..6f0d198 100644
--- a/libminifi/include/Configure.h
+++ b/libminifi/include/Configure.h
@@ -27,7 +27,6 @@
#include <errno.h>
#include <iostream>
#include <fstream>
-
#include "Logger.h"
class Configure {
@@ -46,12 +45,18 @@ public:
static const char *nifi_administrative_yield_duration;
static const char *nifi_bored_yield_duration;
static const char *nifi_graceful_shutdown_seconds;
+ static const char *nifi_log_level;
static const char *nifi_server_name;
static const char *nifi_server_port;
static const char *nifi_server_report_interval;
static const char *nifi_provenance_repository_max_storage_time;
static const char *nifi_provenance_repository_max_storage_size;
static const char *nifi_provenance_repository_directory_default;
+ static const char *nifi_provenance_repository_enable;
+ static const char *nifi_flowfile_repository_max_storage_time;
+ static const char *nifi_flowfile_repository_max_storage_size;
+ static const char *nifi_flowfile_repository_directory_default;
+ static const char *nifi_flowfile_repository_enable;
static const char *nifi_remote_input_secure;
static const char *nifi_security_need_ClientAuth;
static const char *nifi_security_client_certificate;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/Connection.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index 0868560..5af0d2f 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -84,6 +84,10 @@ public:
else
return false;
}
+ //! Get UUID Str
+ std::string getUUIDStr() {
+ return _uuidStr;
+ }
//! Set Connection Source Processor
void setSourceProcessor(Processor *source) {
_srcProcessor = source;
@@ -180,7 +184,8 @@ protected:
std::atomic<uint64_t> _maxQueueDataSize;
//! Flow File Expiration Duration in= MilliSeconds
std::atomic<uint64_t> _expiredDuration;
-
+ //! UUID string
+ std::string _uuidStr;
private:
//! Mutex for protection
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index d60f022..f63029e 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -47,6 +47,7 @@
#include "FlowControlProtocol.h"
#include "RemoteProcessorGroupPort.h"
#include "Provenance.h"
+#include "FlowFileRepository.h"
#include "GetFile.h"
#include "PutFile.h"
#include "TailFile.h"
@@ -64,6 +65,7 @@
#define CONFIG_YAML_PROCESSORS_KEY "Processors"
struct ProcessorConfig {
+ std::string id;
std::string name;
std::string javaClass;
std::string maxConcurrentTasks;
@@ -129,6 +131,10 @@ public:
virtual ProvenanceRepository *getProvenanceRepository() {
return this->_provenanceRepo;
}
+ //! Get the flowfile repository
+ virtual FlowFileRepository *getFlowFileRepository() {
+ return this->_flowfileRepo;
+ }
//! Load flow xml from disk, after that, create the root process group and its children, initialize the flows
virtual void load() = 0;
@@ -172,7 +178,6 @@ public:
_protocol->setSerialNumber(number);
}
-
protected:
//! A global unique identifier
@@ -197,6 +202,8 @@ protected:
std::atomic<bool> _initialized;
//! Provenance Repo
ProvenanceRepository *_provenanceRepo;
+ //! FlowFile Repo
+ FlowFileRepository *_flowfileRepo;
//! Flow Engines
//! Flow Timer Scheduler
TimerDrivenSchedulingAgent _timerScheduler;
@@ -212,7 +219,7 @@ protected:
FlowController() :
_root(0), _maxTimerDrivenThreads(0), _maxEventDrivenThreads(0), _running(
- false), _initialized(false), _provenanceRepo(0), _protocol(
+ false), _initialized(false), _provenanceRepo(0), _flowfileRepo(0), _protocol(
0), logger_(Logger::getLogger()){
}
@@ -246,6 +253,8 @@ public:
void unload();
//! Load new xml
void reload(std::string yamlFile);
+ //! Load Flow File from persistent Flow Repo
+ void loadFlowRepo();
//! update property value
void updatePropertyValue(std::string processorName,
std::string propertyName, std::string propertyValue) {
@@ -283,6 +292,7 @@ private:
//! Logger
std::shared_ptr<Logger> logger_;
Configure *configure_;
+
//! Process Processor Node YAML
void parseProcessorNodeYaml(YAML::Node processorNode, ProcessGroup *parent);
//! Process Port YAML
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/FlowFileRecord.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h
index a3b87ee..ded0623 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -37,6 +37,7 @@
class ProcessSession;
class Connection;
+class FlowFileEventRecord;
#define DEFAULT_FLOWFILE_PATH "."
@@ -107,7 +108,11 @@ public:
/*!
* Create a new flow record
*/
- FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim = NULL);
+ explicit FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim = NULL);
+ /*!
+ * Create a new flow record from repo flow event
+ */
+ explicit FlowFileRecord(FlowFileEventRecord *event);
//! Destructor
virtual ~FlowFileRecord();
//! addAttribute key is enum
@@ -175,6 +180,15 @@ public:
{
return _lineageIdentifiers;
}
+ //! Check whether it is stored to DB already
+ bool isStoredToRepository()
+ {
+ return _isStoredToRepo;
+ }
+ void setStoredToRepository(bool value)
+ {
+ _isStoredToRepo = value;
+ }
protected:
@@ -202,6 +216,8 @@ protected:
std::string _uuidStr;
//! UUID string for all parents
std::set<std::string> _lineageIdentifiers;
+ //! whether it is stored to DB
+ bool _isStoredToRepo;
//! duplicate the original flow file
void duplicate(FlowFileRecord *original);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/FlowFileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowFileRepository.h b/libminifi/include/FlowFileRepository.h
new file mode 100644
index 0000000..50d2c41
--- /dev/null
+++ b/libminifi/include/FlowFileRepository.h
@@ -0,0 +1,204 @@
+/**
+ * @file FlowFileRepository
+ * Flow file repository class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __FLOWFILE_REPOSITORY_H__
+#define __FLOWFILE_REPOSITORY_H__
+
+#include <ftw.h>
+#include <uuid/uuid.h>
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <map>
+#include <set>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "Configure.h"
+#include "Connection.h"
+#include "FlowFileRecord.h"
+#include "Logger.h"
+#include "Property.h"
+#include "ResourceClaim.h"
+#include "io/Serializable.h"
+#include "utils/TimeUtil.h"
+#include "Repository.h"
+
+#define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository"
+#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M
+#define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute
+#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2500) // 2500 msec
+
+//! FlowFile Repository
+class FlowFileRepository : public Repository
+{
+public:
+ //! Constructor
+ /*!
+ * Create a new provenance repository
+ */
+ FlowFileRepository()
+ : Repository(Repository::FLOWFILE, FLOWFILE_REPOSITORY_DIRECTORY,
+ MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, FLOWFILE_REPOSITORY_PURGE_PERIOD)
+ {
+ }
+ //! Destructor
+ virtual ~FlowFileRepository() {
+ }
+ //! Load Repo to Connections
+ void loadFlowFileToConnections(std::map<std::string, Connection *> *connectionMap);
+
+protected:
+
+private:
+
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ FlowFileRepository(const FlowFileRepository &parent);
+ FlowFileRepository &operator=(const FlowFileRepository &parent);
+};
+
+//! FlowFile Event Record
+class FlowFileEventRecord : protected Serializable
+{
+public:
+ //! Constructor
+ /*!
+ * Create a new provenance event record
+ */
+ FlowFileEventRecord()
+ : _entryDate(0), _lineageStartDate(0), _size(0), _offset(0)
+ {
+ _eventTime = getTimeMillis();
+ logger_ = Logger::getLogger();
+ }
+
+ //! Destructor
+ virtual ~FlowFileEventRecord() {
+ }
+ //! Get Attributes
+ std::map<std::string, std::string> getAttributes() {
+ return _attributes;
+ }
+ //! Get Size
+ uint64_t getFileSize() {
+ return _size;
+ }
+ // ! Get Offset
+ uint64_t getFileOffset() {
+ return _offset;
+ }
+ // ! Get Entry Date
+ uint64_t getFlowFileEntryDate() {
+ return _entryDate;
+ }
+ // ! Get Lineage Start Date
+ uint64_t getlineageStartDate() {
+ return _lineageStartDate;
+ }
+ // ! Get Event Time
+ uint64_t getEventTime() {
+ return _eventTime;
+ }
+ //! Get FlowFileUuid
+ std::string getFlowFileUuid()
+ {
+ return _uuid;
+ }
+ //! Get ConnectionUuid
+ std::string getConnectionUuid()
+ {
+ return _uuidConnection;
+ }
+ //! Get content full path
+ std::string getContentFullPath()
+ {
+ return _contentFullPath;
+ }
+ //! Get LineageIdentifiers
+ std::set<std::string> getLineageIdentifiers()
+ {
+ return _lineageIdentifiers;
+ }
+ //! fromFlowFile
+ void fromFlowFile(FlowFileRecord *flow, std::string uuidConnection)
+ {
+ _entryDate = flow->getEntryDate();
+ _lineageStartDate = flow->getlineageStartDate();
+ _lineageIdentifiers = flow->getlineageIdentifiers();
+ _uuid = flow->getUUIDStr();
+ _attributes = flow->getAttributes();
+ _size = flow->getSize();
+ _offset = flow->getOffset();
+ _uuidConnection = uuidConnection;
+ if (flow->getResourceClaim())
+ {
+ _contentFullPath = flow->getResourceClaim()->getContentFullPath();
+ }
+ }
+ //! Serialize and Persistent to the repository
+ bool Serialize(FlowFileRepository *repo);
+ //! DeSerialize
+ bool DeSerialize(const uint8_t *buffer, const int bufferSize);
+ //! DeSerialize
+ bool DeSerialize(DataStream &stream)
+ {
+ return DeSerialize(stream.getBuffer(),stream.getSize());
+ }
+ //! DeSerialize
+ bool DeSerialize(FlowFileRepository *repo, std::string key);
+
+protected:
+
+ //! Date at which the event was created
+ uint64_t _eventTime;
+ //! Date at which the flow file entered the flow
+ uint64_t _entryDate;
+ //! Date at which the origin of this flow file entered the flow
+ uint64_t _lineageStartDate;
+ //! Size in bytes of the data corresponding to this flow file
+ uint64_t _size;
+ //! flow uuid
+ std::string _uuid;
+ //! connection uuid
+ std::string _uuidConnection;
+ //! Offset to the content
+ uint64_t _offset;
+ //! Full path to the content
+ std::string _contentFullPath;
+ //! Attributes key/values pairs for the flow record
+ std::map<std::string, std::string> _attributes;
+ //! UUID string for all parents
+ std::set<std::string> _lineageIdentifiers;
+
+private:
+
+ //! Logger
+ std::shared_ptr<Logger> logger_;
+
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ FlowFileEventRecord(const FlowFileEventRecord &parent);
+ FlowFileEventRecord &operator=(const FlowFileEventRecord &parent);
+
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ProcessGroup.h b/libminifi/include/ProcessGroup.h
index dbff7b0..dfec6c5 100644
--- a/libminifi/include/ProcessGroup.h
+++ b/libminifi/include/ProcessGroup.h
@@ -147,6 +147,8 @@ public:
void removeConnection(Connection *connection);
//! update property value
void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue);
+ //! get connections under the process group
+ void getConnections(std::map<std::string, Connection*> *connectionMap);
protected:
//! A global unique identifier
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ProcessSession.h b/libminifi/include/ProcessSession.h
index 6f01506..4e26758 100644
--- a/libminifi/include/ProcessSession.h
+++ b/libminifi/include/ProcessSession.h
@@ -44,12 +44,7 @@ public:
/*!
* Create a new process session
*/
- ProcessSession(ProcessContext *processContext = NULL) : _processContext(processContext) {
- logger_ = Logger::getLogger();
- logger_->log_trace("ProcessSession created for %s", _processContext->getProcessor()->getName().c_str());
- _provenanceReport = new ProvenanceReporter(_processContext->getProcessor()->getUUIDStr(),
- _processContext->getProcessor()->getName());
- }
+ ProcessSession(ProcessContext *processContext = NULL);
//! Destructor
virtual ~ProcessSession() {
if (_provenanceReport)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/Provenance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Provenance.h b/libminifi/include/Provenance.h
index 52489fc..3ba9792 100644
--- a/libminifi/include/Provenance.h
+++ b/libminifi/include/Provenance.h
@@ -32,10 +32,6 @@
#include <thread>
#include <vector>
-#include "leveldb/db.h"
-#include "leveldb/options.h"
-#include "leveldb/slice.h"
-#include "leveldb/status.h"
#include "Configure.h"
#include "Connection.h"
#include "FlowFileRecord.h"
@@ -44,9 +40,7 @@
#include "ResourceClaim.h"
#include "io/Serializable.h"
#include "utils/TimeUtil.h"
-
-// Provenance Event Record Serialization Seg Size
-#define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048
+#include "Repository.h"
class ProvenanceRepository;
@@ -569,107 +563,23 @@ private:
#define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec
//! Provenance Repository
-class ProvenanceRepository
+class ProvenanceRepository : public Repository
{
public:
//! Constructor
/*!
* Create a new provenance repository
*/
- ProvenanceRepository() {
- logger_ = Logger::getLogger();
- configure_ = Configure::getConfigure();
- _directory = PROVENANCE_DIRECTORY;
- _maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME;
- _purgePeriod = PROVENANCE_PURGE_PERIOD;
- _maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE;
- _db = NULL;
- _thread = NULL;
- _running = false;
- _repoFull = false;
+ ProvenanceRepository()
+ : Repository(Repository::PROVENANCE, PROVENANCE_DIRECTORY,
+ MAX_PROVENANCE_ENTRY_LIFE_TIME, MAX_PROVENANCE_STORAGE_SIZE, PROVENANCE_PURGE_PERIOD)
+ {
}
//! Destructor
virtual ~ProvenanceRepository() {
- stop();
- if (this->_thread)
- delete this->_thread;
- destroy();
}
- //! initialize
- virtual bool initialize()
- {
- std::string value;
- if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
- {
- _directory = value;
- }
- logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
- if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
- {
- Property::StringToInt(value, _maxPartitionBytes);
- }
- logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
- if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
- {
- TimeUnit unit;
- if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
- Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
- {
- }
- }
- logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
- leveldb::Options options;
- options.create_if_missing = true;
- leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
- if (status.ok())
- {
- logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
- }
- else
- {
- logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
- return false;
- }
-
- // start the monitor thread
- start();
- return true;
- }
- //! Put
- virtual bool Put(std::string key, uint8_t *buf, int bufLen)
- {
-
- // persistent to the DB
- leveldb::Slice value((const char *) buf, bufLen);
- leveldb::Status status;
- status = _db->Put(leveldb::WriteOptions(), key, value);
- if (status.ok())
- return true;
- else
- return false;
- }
- //! Delete
- virtual bool Delete(std::string key)
- {
- leveldb::Status status;
- status = _db->Delete(leveldb::WriteOptions(), key);
- if (status.ok())
- return true;
- else
- return false;
- }
- //! Get
- virtual bool Get(std::string key, std::string &value)
- {
- leveldb::Status status;
- status = _db->Get(leveldb::ReadOptions(), key, &value);
- if (status.ok())
- return true;
- else
- return false;
- }
//! Persistent event
void registerEvent(ProvenanceEventRecord *event)
{
@@ -680,77 +590,15 @@ public:
{
Delete(event->getEventId());
}
- //! destroy
- void destroy()
- {
- if (_db)
- {
- delete _db;
- _db = NULL;
- }
- }
- //! Run function for the thread
- static void run(ProvenanceRepository *repo);
- //! Start the repository monitor thread
- virtual void start();
- //! Stop the repository monitor thread
- virtual void stop();
- //! whether the repo is full
- virtual bool isFull()
- {
- return _repoFull;
- }
protected:
private:
- //! Mutex for protection
- std::mutex _mtx;
- //! repository directory
- std::string _directory;
- //! Logger
- std::shared_ptr<Logger> logger_;
- //! Configure
- //! max db entry life time
- Configure *configure_;
- int64_t _maxPartitionMillis;
- //! max db size
- int64_t _maxPartitionBytes;
- //! purge period
- uint64_t _purgePeriod;
- //! level DB database
- leveldb::DB* _db;
- //! thread
- std::thread *_thread;
- //! whether it is running
- bool _running;
- //! whether stop accepting provenace event
- std::atomic<bool> _repoFull;
- //! size of the directory
- static uint64_t _repoSize;
- //! call back for directory size
- static int repoSum(const char *fpath, const struct stat *sb, int typeflag)
- {
- _repoSize += sb->st_size;
- return 0;
- }
- //! repoSize
- uint64_t repoSize()
- {
- _repoSize = 0;
- if (ftw(_directory.c_str(), repoSum, 1) != 0)
- _repoSize = 0;
-
- return _repoSize;
- }
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
ProvenanceRepository(const ProvenanceRepository &parent);
ProvenanceRepository &operator=(const ProvenanceRepository &parent);
};
-
-
-
#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/Repository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Repository.h b/libminifi/include/Repository.h
new file mode 100644
index 0000000..8b1c3ec
--- /dev/null
+++ b/libminifi/include/Repository.h
@@ -0,0 +1,294 @@
+/**
+ * @file Repository
+ * Repository class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REPOSITORY_H__
+#define __REPOSITORY_H__
+
+#include <ftw.h>
+#include <uuid/uuid.h>
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <map>
+#include <set>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "leveldb/db.h"
+#include "leveldb/options.h"
+#include "leveldb/slice.h"
+#include "leveldb/status.h"
+#include "Configure.h"
+#include "Connection.h"
+#include "FlowFileRecord.h"
+#include "Logger.h"
+#include "Property.h"
+#include "ResourceClaim.h"
+#include "io/Serializable.h"
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+
+//! Repository
+class Repository
+{
+public:
+ enum RepositoryType {
+ //! Provenance Repo Type
+ PROVENANCE,
+ //! FlowFile Repo Type
+ FLOWFILE,
+ MAX_REPO_TYPE
+ };
+ static const char *RepositoryTypeStr[MAX_REPO_TYPE];
+ //! Constructor
+ /*!
+ * Create a new provenance repository
+ */
+ Repository(RepositoryType type, std::string directory,
+ int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
+ _type = type;
+ _directory = directory;
+ _maxPartitionMillis = maxPartitionMillis;
+ _maxPartitionBytes = maxPartitionBytes;
+ _purgePeriod = purgePeriod;
+ logger_ = Logger::getLogger();
+ configure_ = Configure::getConfigure();
+ _db = NULL;
+ _thread = NULL;
+ _running = false;
+ _repoFull = false;
+ _enable = true;
+ }
+
+ //! Destructor
+ virtual ~Repository() {
+ stop();
+ if (this->_thread)
+ delete this->_thread;
+ destroy();
+ }
+
+ //! initialize
+ virtual bool initialize()
+ {
+ std::string value;
+
+ if (_type == PROVENANCE)
+ {
+ if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
+ && StringUtils::StringToBool(value, _enable))) {
+ _enable = true;
+ }
+ if (!_enable)
+ return false;
+ if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
+ {
+ _directory = value;
+ }
+ logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
+ if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
+ {
+ Property::StringToInt(value, _maxPartitionBytes);
+ }
+ logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
+ if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
+ {
+ TimeUnit unit;
+ if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
+ Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
+ {
+ }
+ }
+ logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
+ leveldb::Options options;
+ options.create_if_missing = true;
+ leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
+ if (status.ok())
+ {
+ logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
+ }
+ else
+ {
+ logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
+ return false;
+ }
+ }
+
+ if (_type == FLOWFILE)
+ {
+ if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
+ && StringUtils::StringToBool(value, _enable))) {
+ _enable = true;
+ }
+ if (!_enable)
+ return false;
+ if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
+ {
+ _directory = value;
+ }
+ logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
+ if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
+ {
+ Property::StringToInt(value, _maxPartitionBytes);
+ }
+ logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
+ if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
+ {
+ TimeUnit unit;
+ if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
+ Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
+ {
+ }
+ }
+ logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
+ leveldb::Options options;
+ options.create_if_missing = true;
+ leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
+ if (status.ok())
+ {
+ logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
+ }
+ else
+ {
+ logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
+ return false;
+ }
+ }
+
+ return true;
+ }
+ //! Put
+ virtual bool Put(std::string key, uint8_t *buf, int bufLen)
+ {
+ if (!_enable)
+ return false;
+
+ // persistent to the DB
+ leveldb::Slice value((const char *) buf, bufLen);
+ leveldb::Status status;
+ status = _db->Put(leveldb::WriteOptions(), key, value);
+ if (status.ok())
+ return true;
+ else
+ return false;
+ }
+ //! Delete
+ virtual bool Delete(std::string key)
+ {
+ if (!_enable)
+ return false;
+ leveldb::Status status;
+ status = _db->Delete(leveldb::WriteOptions(), key);
+ if (status.ok())
+ return true;
+ else
+ return false;
+ }
+ //! Get
+ virtual bool Get(std::string key, std::string &value)
+ {
+ if (!_enable)
+ return false;
+ leveldb::Status status;
+ status = _db->Get(leveldb::ReadOptions(), key, &value);
+ if (status.ok())
+ return true;
+ else
+ return false;
+ }
+ //! Run function for the thread
+ static void run(Repository *repo);
+ //! Start the repository monitor thread
+ virtual void start();
+ //! Stop the repository monitor thread
+ virtual void stop();
+ //! whether the repo is full
+ virtual bool isFull()
+ {
+ return _repoFull;
+ }
+ //! whether the repo is enable
+ virtual bool isEnable()
+ {
+ return _enable;
+ }
+
+protected:
+ //! Repo Type
+ RepositoryType _type;
+ //! Mutex for protection
+ std::mutex _mtx;
+ //! repository directory
+ std::string _directory;
+ //! Logger
+ std::shared_ptr<Logger> logger_;
+ //! Configure
+ //! max db entry life time
+ Configure *configure_;
+ int64_t _maxPartitionMillis;
+ //! max db size
+ int64_t _maxPartitionBytes;
+ //! purge period
+ uint64_t _purgePeriod;
+ //! level DB database
+ leveldb::DB* _db;
+ //! thread
+ std::thread *_thread;
+ //! whether the monitoring thread is running for the repo while it was enabled
+ bool _running;
+ //! whether it is enabled by minfi property for the repo
+ bool _enable;
+ //! whether stop accepting provenace event
+ std::atomic<bool> _repoFull;
+ //! repoSize
+ uint64_t repoSize();
+ //! size of the directory
+ static uint64_t _repoSize[MAX_REPO_TYPE];
+ //! call back for directory size
+ static int repoSumProvenance(const char *fpath, const struct stat *sb, int typeflag)
+ {
+ _repoSize[PROVENANCE] += sb->st_size;
+ return 0;
+ }
+ //! call back for directory size
+ static int repoSumFlowFile(const char *fpath, const struct stat *sb, int typeflag)
+ {
+ _repoSize[FLOWFILE] += sb->st_size;
+ return 0;
+ }
+
+private:
+ //! destroy
+ void destroy()
+ {
+ if (_db)
+ {
+ delete _db;
+ _db = NULL;
+ }
+ }
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ Repository(const Repository &parent);
+ Repository &operator=(const Repository &parent);
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/include/ResourceClaim.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ResourceClaim.h b/libminifi/include/ResourceClaim.h
index 1dd2704..7ca79a3 100644
--- a/libminifi/include/ResourceClaim.h
+++ b/libminifi/include/ResourceClaim.h
@@ -67,6 +67,11 @@ public:
{
return _contentFullPath;
}
+ //! Set the content full path
+ void setContentFullPath(std::string path)
+ {
+ _contentFullPath = path;
+ }
protected:
//! A global unique identifier
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 94e33a1..6f5c08d 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -25,12 +25,18 @@ const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.f
const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration";
const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration";
const char *Configure::nifi_graceful_shutdown_seconds = "nifi.graceful.shutdown.seconds";
+const char *Configure::nifi_log_level = "nifi.log.level";
const char *Configure::nifi_server_name = "nifi.server.name";
const char *Configure::nifi_server_port = "nifi.server.port";
const char *Configure::nifi_server_report_interval= "nifi.server.report.interval";
const char *Configure::nifi_provenance_repository_max_storage_size = "nifi.provenance.repository.max.storage.size";
const char *Configure::nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time";
const char *Configure::nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default";
+const char *Configure::nifi_provenance_repository_enable = "nifi.provenance.repository.enable";
+const char *Configure::nifi_flowfile_repository_max_storage_size = "nifi.flowfile.repository.max.storage.size";
+const char *Configure::nifi_flowfile_repository_max_storage_time = "nifi.flowfile.repository.max.storage.time";
+const char *Configure::nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default";
+const char *Configure::nifi_flowfile_repository_enable = "nifi.flowfile.repository.enable";
const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure";
const char *Configure::nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
const char *Configure::nifi_security_client_certificate = "nifi.security.client.certificate";
@@ -38,7 +44,6 @@ const char *Configure::nifi_security_client_private_key = "nifi.security.client.
const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client.pass.phrase";
const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate";
-
//! Get the config value
bool Configure::get(std::string key, std::string &value)
{
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index 791fa63..42dbfe4 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -29,6 +29,8 @@
#include "Connection.h"
#include "Processor.h"
+#include "FlowFileRepository.h"
+#include "FlowController.h"
Connection::Connection(std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t destUUID)
: _name(name)
@@ -53,6 +55,10 @@ Connection::Connection(std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t des
logger_ = Logger::getLogger();
+ char uuidStr[37];
+ uuid_unparse_lower(_uuid, uuidStr);
+ _uuidStr = uuidStr;
+
logger_->log_info("Connection %s created", _name.c_str());
}
@@ -93,6 +99,21 @@ void Connection::put(FlowFileRecord *flow)
flow->getUUIDStr().c_str(), _name.c_str());
}
+
+ if (FlowControllerFactory::getFlowController()->getFlowFileRepository() &&
+ FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable() &&
+ !flow->isStoredToRepository())
+ {
+ // Save to the flowfile repo
+ FlowFileEventRecord event;
+ event.fromFlowFile(flow, this->_uuidStr);
+ if (event.Serialize(
+ FlowControllerFactory::getFlowController()->getFlowFileRepository()))
+ {
+ flow->setStoredToRepository(true);
+ }
+ }
+
// Notify receiving processor that work may be available
if(_destProcessor)
{
@@ -117,6 +138,13 @@ FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> &expiredFlowRecords)
{
// Flow record expired
expiredFlowRecords.insert(item);
+ if (FlowControllerFactory::getFlowController()->getFlowFileRepository() &&
+ FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable())
+ {
+ // delete from the flowfile repo
+ FlowControllerFactory::getFlowController()->getFlowFileRepository()->Delete(item->getUUIDStr());
+ item->setStoredToRepository(false);
+ }
}
else
{
@@ -131,6 +159,13 @@ FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> &expiredFlowRecords)
item->setOriginalConnection(this);
logger_->log_debug("Dequeue flow file UUID %s from connection %s",
item->getUUIDStr().c_str(), _name.c_str());
+ if (FlowControllerFactory::getFlowController()->getFlowFileRepository() &&
+ FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable())
+ {
+ // delete from the flowfile repo
+ FlowControllerFactory::getFlowController()->getFlowFileRepository()->Delete(item->getUUIDStr());
+ item->setStoredToRepository(false);
+ }
return item;
}
}
@@ -147,6 +182,13 @@ FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> &expiredFlowRecords)
item->setOriginalConnection(this);
logger_->log_debug("Dequeue flow file UUID %s from connection %s",
item->getUUIDStr().c_str(), _name.c_str());
+ if (FlowControllerFactory::getFlowController()->getFlowFileRepository() &&
+ FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable())
+ {
+ // delete from the flowfile repo
+ FlowControllerFactory::getFlowController()->getFlowFileRepository()->Delete(item->getUUIDStr());
+ item->setStoredToRepository(false);
+ }
return item;
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 4f5faf8..28e35b7 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -109,6 +109,8 @@ FlowControllerImpl::FlowControllerImpl(std::string name) {
// Create repos for flow record and provenance
+ _flowfileRepo = new FlowFileRepository();
+ _flowfileRepo->initialize();
_provenanceRepo = new ProvenanceRepository();
_provenanceRepo->initialize();
}
@@ -121,6 +123,8 @@ FlowControllerImpl::~FlowControllerImpl() {
delete _protocol;
if (NULL != _provenanceRepo)
delete _provenanceRepo;
+ if (NULL != _flowfileRepo)
+ delete _flowfileRepo;
}
@@ -133,6 +137,8 @@ void FlowControllerImpl::stop(bool force) {
logger_->log_info("Stop Flow Controller");
this->_timerScheduler.stop();
this->_eventScheduler.stop();
+ this->_flowfileRepo->stop();
+ this->_provenanceRepo->stop();
// Wait for sometime for thread stop
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
if (this->_root)
@@ -238,14 +244,12 @@ void FlowControllerImpl::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
uuid_t uuid;
ProcessGroup *group = NULL;
- // generate the random UIID
- uuid_generate(uuid);
-
std::string flowName = rootFlowNode["name"].as<std::string>();
+ std::string id = rootFlowNode["id"].as<std::string>();
+
+ uuid_parse(id.c_str(), uuid);
- char uuidStr[37];
- uuid_unparse_lower(_uuid, uuidStr);
- logger_->log_debug("parseRootProcessGroup: id => [%s]", uuidStr);
+ logger_->log_debug("parseRootProcessGroup: id => [%s]", id.c_str());
logger_->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str());
group = this->createRootProcessGroup(flowName, uuid);
this->_root = group;
@@ -278,17 +282,14 @@ void FlowControllerImpl::parseProcessorNodeYaml(YAML::Node processorsNode,
YAML::Node procNode = iter->as<YAML::Node>();
procCfg.name = procNode["name"].as<std::string>();
- logger_->log_debug("parseProcessorNode: name => [%s]",
- procCfg.name.c_str());
+ procCfg.id = procNode["id"].as<std::string>();
+ logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]",
+ procCfg.name.c_str(), procCfg.id.c_str());
procCfg.javaClass = procNode["class"].as<std::string>();
logger_->log_debug("parseProcessorNode: class => [%s]",
procCfg.javaClass.c_str());
- char uuidStr[37];
- uuid_unparse_lower(_uuid, uuidStr);
-
- // generate the random UUID
- uuid_generate(uuid);
+ uuid_parse(procCfg.id.c_str(), uuid);
// Determine the processor name only from the Java class
int lastOfIdx = procCfg.javaClass.find_last_of(".");
@@ -303,7 +304,7 @@ void FlowControllerImpl::parseProcessorNodeYaml(YAML::Node processorsNode,
if (!processor) {
logger_->log_error(
"Could not create a processor %s with name %s",
- procCfg.name.c_str(), uuidStr);
+ procCfg.name.c_str(), procCfg.id.c_str());
throw std::invalid_argument(
"Could not create processor " + procCfg.name);
}
@@ -468,8 +469,10 @@ void FlowControllerImpl::parseRemoteProcessGroupYaml(YAML::Node *rpgNode,
YAML::Node rpgNode = iter->as<YAML::Node>();
auto name = rpgNode["name"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s]",
- name.c_str());
+ auto id = rpgNode["id"].as<std::string>();
+
+ logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]",
+ name.c_str(), id.c_str());
std::string url = rpgNode["url"].as<std::string>();
logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]",
@@ -491,11 +494,7 @@ void FlowControllerImpl::parseRemoteProcessGroupYaml(YAML::Node *rpgNode,
rpgNode["Output Ports"].as<YAML::Node>();
ProcessGroup *group = NULL;
- // generate the random UUID
- uuid_generate(uuid);
-
- char uuidStr[37];
- uuid_unparse_lower(_uuid, uuidStr);
+ uuid_parse(id.c_str(), uuid);
int64_t timeoutValue = -1;
int64_t yieldPeriodValue = -1;
@@ -568,20 +567,18 @@ void FlowControllerImpl::parseConnectionYaml(YAML::Node *connectionsNode,
if (connectionsNode->IsSequence()) {
for (YAML::const_iterator iter = connectionsNode->begin();
iter != connectionsNode->end(); ++iter) {
- // generate the random UUID
- uuid_generate(uuid);
YAML::Node connectionNode = iter->as<YAML::Node>();
std::string name = connectionNode["name"].as<std::string>();
- std::string destName = connectionNode["destination name"].as<
+ std::string id = connectionNode["id"].as<std::string>();
+ std::string destId = connectionNode["destination id"].as<
std::string>();
- char uuidStr[37];
- uuid_unparse_lower(_uuid, uuidStr);
+ uuid_parse(id.c_str(), uuid);
logger_->log_debug(
- "Created connection with UUID %s and name %s", uuidStr,
+ "Created connection with UUID %s and name %s", id.c_str(),
name.c_str());
connection = this->createConnection(name, uuid);
auto rawRelationship =
@@ -592,26 +589,30 @@ void FlowControllerImpl::parseConnectionYaml(YAML::Node *connectionsNode,
rawRelationship.c_str());
if (connection)
connection->setRelationship(relationship);
- std::string connectionSrcProcName =
- connectionNode["source name"].as<std::string>();
+ std::string connectionSrcProcId =
+ connectionNode["source id"].as<std::string>();
+ uuid_t srcUUID;
+ uuid_parse(connectionSrcProcId.c_str(), srcUUID);
Processor *srcProcessor = this->_root->findProcessor(
- connectionSrcProcName);
+ srcUUID);
if (!srcProcessor) {
logger_->log_error(
- "Could not locate a source with name %s to create a connection",
- connectionSrcProcName.c_str());
+ "Could not locate a source with id %s to create a connection",
+ connectionSrcProcId.c_str());
throw std::invalid_argument(
- "Could not locate a source with name %s to create a connection "
- + connectionSrcProcName);
+ "Could not locate a source with id %s to create a connection "
+ + connectionSrcProcId);
}
- Processor *destProcessor = this->_root->findProcessor(destName);
+ uuid_t destUUID;
+ uuid_parse(destId.c_str(), destUUID);
+ Processor *destProcessor = this->_root->findProcessor(destUUID);
// If we could not find name, try by UUID
if (!destProcessor) {
uuid_t destUuid;
- uuid_parse(destName.c_str(), destUuid);
+ uuid_parse(destId.c_str(), destUuid);
destProcessor = this->_root->findProcessor(destUuid);
}
if (destProcessor) {
@@ -728,11 +729,23 @@ void FlowControllerImpl::load() {
parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, this->_root);
parseConnectionYaml(&connectionsNode, this->_root);
- _initialized = true;
+ // Load Flow File from Repo
+ loadFlowRepo();
+ _initialized = true;
}
}
+void FlowControllerImpl::loadFlowRepo()
+{
+ if (this->_flowfileRepo && this->_flowfileRepo->isEnable())
+ {
+ std::map<std::string, Connection *> connectionMap;
+ this->_root->getConnections(&connectionMap);
+ this->_flowfileRepo->loadFlowFileToConnections(&connectionMap);
+ }
+}
+
void FlowControllerImpl::reload(std::string yamlFile)
{
logger_->log_info("Starting to reload Flow Controller with yaml %s", yamlFile.c_str());
@@ -769,6 +782,8 @@ bool FlowControllerImpl::start() {
&this->_eventScheduler);
_running = true;
this->_protocol->start();
+ this->_provenanceRepo->start();
+ this->_flowfileRepo->start();
logger_->log_info("Started Flow Controller");
}
return true;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 362602e..a2f2323 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -27,9 +27,10 @@
#include <cstdio>
#include "FlowFileRecord.h"
-
-#include "Logger.h"
#include "Relationship.h"
+#include "Logger.h"
+#include "FlowController.h"
+#include "FlowFileRepository.h"
std::atomic<uint64_t> FlowFileRecord::_localFlowSeqNumber(0);
@@ -39,6 +40,7 @@ FlowFileRecord::FlowFileRecord(std::map<std::string, std::string> attributes, Re
_offset(0),
_penaltyExpirationMs(0),
_claim(claim),
+ _isStoredToRepo(false),
_markedDelete(false),
_connection(NULL),
_orginalConnection(NULL)
@@ -74,6 +76,43 @@ FlowFileRecord::FlowFileRecord(std::map<std::string, std::string> attributes, Re
logger_ = Logger::getLogger();
}
+FlowFileRecord::FlowFileRecord(FlowFileEventRecord *event)
+: _size(0),
+ _id(_localFlowSeqNumber.load()),
+ _offset(0),
+ _penaltyExpirationMs(0),
+ _claim(NULL),
+ _isStoredToRepo(false),
+ _markedDelete(false),
+ _connection(NULL),
+ _orginalConnection(NULL)
+{
+ _entryDate = event->getFlowFileEntryDate();
+ _lineageStartDate = event->getlineageStartDate();
+ _size = event->getFileSize();
+ _offset = event->getFileOffset();
+ _lineageIdentifiers = event->getLineageIdentifiers();
+ _attributes = event->getAttributes();
+ _snapshot = false;
+ _uuidStr = event->getFlowFileUuid();
+ uuid_parse(_uuidStr.c_str(), _uuid);
+
+ if (_size > 0)
+ {
+ _claim = new ResourceClaim();
+ }
+
+ if (_claim)
+ {
+ _claim->setContentFullPath(event->getContentFullPath());
+ // Increase the flow file record owned count for the resource claim
+ _claim->increaseFlowFileRecordOwnedCount();
+ }
+ logger_ = Logger::getLogger();
+ ++_localFlowSeqNumber;
+}
+
+
FlowFileRecord::~FlowFileRecord()
{
if (!_snapshot)
@@ -87,7 +126,15 @@ FlowFileRecord::~FlowFileRecord()
if (_claim->getFlowFileRecordOwnedCount() <= 0)
{
logger_->log_debug("Delete Resource Claim %s", _claim->getContentFullPath().c_str());
- std::remove(_claim->getContentFullPath().c_str());
+ std::string value;
+ if (!FlowControllerFactory::getFlowController()->getFlowFileRepository() ||
+ !FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable() ||
+ !this->_isStoredToRepo ||
+ !FlowControllerFactory::getFlowController()->getFlowFileRepository()->Get(_uuidStr, value))
+ {
+ // if it is persistent to DB already while it is in the queue, we keep the content
+ std::remove(_claim->getContentFullPath().c_str());
+ }
delete _claim;
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRepository.cpp b/libminifi/src/FlowFileRepository.cpp
new file mode 100644
index 0000000..ade8dce
--- /dev/null
+++ b/libminifi/src/FlowFileRepository.cpp
@@ -0,0 +1,280 @@
+/**
+ * @file FlowFileRepository.cpp
+ * FlowFile implemenatation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <cstdint>
+#include <vector>
+#include <arpa/inet.h>
+#include "io/DataStream.h"
+#include "io/Serializable.h"
+#include "FlowFileRecord.h"
+#include "Relationship.h"
+#include "Logger.h"
+#include "FlowController.h"
+#include "FlowFileRepository.h"
+
+//! DeSerialize
+bool FlowFileEventRecord::DeSerialize(FlowFileRepository *repo,
+ std::string key) {
+ std::string value;
+ bool ret;
+
+ ret = repo->Get(key, value);
+
+ if (!ret) {
+ logger_->log_error("NiFi FlowFile Store event %s can not found",
+ key.c_str());
+ return false;
+ } else
+ logger_->log_debug("NiFi FlowFile Read event %s length %d",
+ key.c_str(), value.length());
+
+
+ DataStream stream((const uint8_t*)value.data(),value.length());
+
+ ret = DeSerialize(stream);
+
+ if (ret) {
+ logger_->log_debug(
+ "NiFi FlowFile retrieve uuid %s size %d connection %s success",
+ _uuid.c_str(), stream.getSize(), _uuidConnection.c_str());
+ } else {
+ logger_->log_debug(
+ "NiFi FlowFile retrieve uuid %s size %d connection %d fail",
+ _uuid.c_str(), stream.getSize(), _uuidConnection.c_str());
+ }
+
+ return ret;
+}
+
+bool FlowFileEventRecord::Serialize(FlowFileRepository *repo) {
+
+ DataStream outStream;
+
+ int ret;
+
+ ret = write(this->_eventTime,&outStream);
+ if (ret != 8) {
+
+ return false;
+ }
+
+ ret = write(this->_entryDate,&outStream);
+ if (ret != 8) {
+ return false;
+ }
+
+ ret = write(this->_lineageStartDate,&outStream);
+ if (ret != 8) {
+
+ return false;
+ }
+
+ ret = writeUTF(this->_uuid,&outStream);
+ if (ret <= 0) {
+
+ return false;
+ }
+
+ ret = writeUTF(this->_uuidConnection,&outStream);
+ if (ret <= 0) {
+
+ return false;
+ }
+
+ // write flow attributes
+ uint32_t numAttributes = this->_attributes.size();
+ ret = write(numAttributes,&outStream);
+ if (ret != 4) {
+
+ return false;
+ }
+
+ for (auto itAttribute : _attributes) {
+ ret = writeUTF(itAttribute.first,&outStream, true);
+ if (ret <= 0) {
+
+ return false;
+ }
+ ret = writeUTF(itAttribute.second,&outStream, true);
+ if (ret <= 0) {
+
+ return false;
+ }
+ }
+
+ ret = writeUTF(this->_contentFullPath,&outStream);
+ if (ret <= 0) {
+
+ return false;
+ }
+
+ ret = write(this->_size,&outStream);
+ if (ret != 8) {
+
+ return false;
+ }
+
+ ret = write(this->_offset,&outStream);
+ if (ret != 8) {
+
+ return false;
+ }
+
+ // Persistent to the DB
+
+ if (repo->Put(_uuid, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
+ logger_->log_debug("NiFi FlowFile Store event %s size %d success",
+ _uuid.c_str(), outStream.getSize());
+ return true;
+ } else {
+ logger_->log_error("NiFi FlowFile Store event %s size %d fail",
+ _uuid.c_str(), outStream.getSize());
+ return false;
+ }
+
+ // cleanup
+
+ return true;
+}
+
+bool FlowFileEventRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
+
+ int ret;
+
+ DataStream outStream(buffer,bufferSize);
+
+ ret = read(this->_eventTime,&outStream);
+ if (ret != 8) {
+ return false;
+ }
+
+ ret = read(this->_entryDate,&outStream);
+ if (ret != 8) {
+ return false;
+ }
+
+ ret = read(this->_lineageStartDate,&outStream);
+ if (ret != 8) {
+ return false;
+ }
+
+ ret = readUTF(this->_uuid,&outStream);
+ if (ret <= 0) {
+ return false;
+ }
+
+ ret = readUTF(this->_uuidConnection,&outStream);
+ if (ret <= 0) {
+ return false;
+ }
+
+ // read flow attributes
+ uint32_t numAttributes = 0;
+ ret = read(numAttributes,&outStream);
+ if (ret != 4) {
+ return false;
+ }
+
+ for (uint32_t i = 0; i < numAttributes; i++) {
+ std::string key;
+ ret = readUTF(key,&outStream, true);
+ if (ret <= 0) {
+ return false;
+ }
+ std::string value;
+ ret = readUTF(value,&outStream, true);
+ if (ret <= 0) {
+ return false;
+ }
+ this->_attributes[key] = value;
+ }
+
+ ret = readUTF(this->_contentFullPath,&outStream);
+ if (ret <= 0) {
+ return false;
+ }
+
+ ret = read(this->_size,&outStream);
+ if (ret != 8) {
+ return false;
+ }
+
+ ret = read(this->_offset,&outStream);
+ if (ret != 8) {
+ return false;
+ }
+
+ return true;
+}
+
+void FlowFileRepository::loadFlowFileToConnections(std::map<std::string, Connection *> *connectionMap)
+{
+ if (!_enable)
+ return;
+
+ std::vector<std::string> purgeList;
+ leveldb::Iterator* it = _db->NewIterator(
+ leveldb::ReadOptions());
+
+ for (it->SeekToFirst(); it->Valid(); it->Next())
+ {
+ FlowFileEventRecord eventRead;
+ std::string key = it->key().ToString();
+ if (eventRead.DeSerialize((uint8_t *) it->value().data(),
+ (int) it->value().size()))
+ {
+ auto search = connectionMap->find(eventRead.getConnectionUuid());
+ if (search != connectionMap->end())
+ {
+ // we find the connection for the persistent flowfile, create the flowfile and enqueue that
+ FlowFileRecord *record = new FlowFileRecord(&eventRead);
+ // set store to repo to true so that we do need to persistent again in enqueue
+ record->setStoredToRepository(true);
+ search->second->put(record);
+ }
+ else
+ {
+ if (eventRead.getContentFullPath().length() > 0)
+ {
+ std::remove(eventRead.getContentFullPath().c_str());
+ }
+ purgeList.push_back(key);
+ }
+ }
+ else
+ {
+ purgeList.push_back(key);
+ }
+ }
+
+ delete it;
+ std::vector<std::string>::iterator itPurge;
+ for (itPurge = purgeList.begin(); itPurge != purgeList.end();
+ itPurge++)
+ {
+ std::string eventId = *itPurge;
+ logger_->log_info("Repository Repo %s Purge %s",
+ RepositoryTypeStr[_type],
+ eventId.c_str());
+ Delete(eventId);
+ }
+
+ return;
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessGroup.cpp b/libminifi/src/ProcessGroup.cpp
index f451838..7e3527e 100644
--- a/libminifi/src/ProcessGroup.cpp
+++ b/libminifi/src/ProcessGroup.cpp
@@ -66,6 +66,18 @@ ProcessGroup::~ProcessGroup() {
}
}
+void ProcessGroup::getConnections(std::map<std::string, Connection*> *connectionMap)
+{
+ for (auto connection : connections_)
+ {
+ (*connectionMap)[connection->getUUIDStr()] = connection;
+ }
+
+ for (auto processGroup: child_process_groups_) {
+ processGroup->getConnections(connectionMap);
+ }
+}
+
bool ProcessGroup::isRootProcessGroup() {
std::lock_guard<std::mutex> lock(mtx_);
return (type_ == ROOT_PROCESS_GROUP);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessSession.cpp b/libminifi/src/ProcessSession.cpp
index c67abcc..3b3eb64 100644
--- a/libminifi/src/ProcessSession.cpp
+++ b/libminifi/src/ProcessSession.cpp
@@ -28,6 +28,18 @@
#include <iostream>
#include "ProcessSession.h"
+#include "FlowController.h"
+
+ProcessSession::ProcessSession(ProcessContext *processContext) : _processContext(processContext) {
+ logger_ = Logger::getLogger();
+ logger_->log_trace("ProcessSession created for %s", _processContext->getProcessor()->getName().c_str());
+ _provenanceReport = NULL;
+ if (FlowControllerFactory::getFlowController()->getProvenanceRepository()->isEnable())
+ {
+ _provenanceReport = new ProvenanceReporter(_processContext->getProcessor()->getUUIDStr(),
+ _processContext->getProcessor()->getName());
+ }
+}
FlowFileRecord* ProcessSession::create()
{
@@ -39,7 +51,8 @@ FlowFileRecord* ProcessSession::create()
_addedFlowFiles[record->getUUIDStr()] = record;
logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
std::string details = _processContext->getProcessor()->getName() + " creates flow record " + record->getUUIDStr();
- _provenanceReport->create(record, details);
+ if (_provenanceReport)
+ _provenanceReport->create(record, details);
}
return record;
@@ -91,7 +104,8 @@ FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent)
record->_size = parent->_size;
record->_claim->increaseFlowFileRecordOwnedCount();
}
- _provenanceReport->clone(parent, record);
+ if (_provenanceReport)
+ _provenanceReport->clone(parent, record);
}
return record;
}
@@ -129,7 +143,8 @@ FlowFileRecord* ProcessSession::cloneDuringTransfer(FlowFileRecord *parent)
record->_size = parent->_size;
record->_claim->increaseFlowFileRecordOwnedCount();
}
- _provenanceReport->clone(parent, record);
+ if (_provenanceReport)
+ _provenanceReport->clone(parent, record);
}
return record;
@@ -161,7 +176,8 @@ FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, long
record->_claim = parent->_claim;
record->_claim->increaseFlowFileRecordOwnedCount();
}
- _provenanceReport->clone(parent, record);
+ if (_provenanceReport)
+ _provenanceReport->clone(parent, record);
}
return record;
}
@@ -171,7 +187,8 @@ void ProcessSession::remove(FlowFileRecord *flow)
flow->_markedDelete = true;
_deletedFlowFiles[flow->getUUIDStr()] = flow;
std::string reason = _processContext->getProcessor()->getName() + " drop flow record " + flow->getUUIDStr();
- _provenanceReport->drop(flow, reason);
+ if (_provenanceReport)
+ _provenanceReport->drop(flow, reason);
}
void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, std::string value)
@@ -179,7 +196,8 @@ void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, std::st
flow->setAttribute(key, value);
std::string details = _processContext->getProcessor()->getName() + " modify flow record " + flow->getUUIDStr() +
" attribute " + key + ":" + value;
- _provenanceReport->modifyAttributes(flow, details);
+ if (_provenanceReport)
+ _provenanceReport->modifyAttributes(flow, details);
}
void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key)
@@ -187,7 +205,8 @@ void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key)
flow->removeAttribute(key);
std::string details = _processContext->getProcessor()->getName() + " remove flow record " + flow->getUUIDStr() +
" attribute " + key;
- _provenanceReport->modifyAttributes(flow, details);
+ if (_provenanceReport)
+ _provenanceReport->modifyAttributes(flow, details);
}
void ProcessSession::penalize(FlowFileRecord *flow)
@@ -233,7 +252,8 @@ void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback)
fs.close();
std::string details = _processContext->getProcessor()->getName() + " modify flow record content " + flow->getUUIDStr();
uint64_t endTime = getTimeMillis();
- _provenanceReport->modifyContent(flow, details, endTime - startTime);
+ if (_provenanceReport)
+ _provenanceReport->modifyContent(flow, details, endTime - startTime);
}
else
{
@@ -304,7 +324,8 @@ void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback *callback
fs.close();
std::string details = _processContext->getProcessor()->getName() + " modify flow record content " + flow->getUUIDStr();
uint64_t endTime = getTimeMillis();
- _provenanceReport->modifyContent(flow, details, endTime - startTime);
+ if (_provenanceReport)
+ _provenanceReport->modifyContent(flow, details, endTime - startTime);
}
else
{
@@ -430,7 +451,8 @@ void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepS
std::remove(source.c_str());
std::string details = _processContext->getProcessor()->getName() + " modify flow record content " + flow->getUUIDStr();
uint64_t endTime = getTimeMillis();
- _provenanceReport->modifyContent(flow, details, endTime - startTime);
+ if (_provenanceReport)
+ _provenanceReport->modifyContent(flow, details, endTime - startTime);
}
else
{
@@ -653,7 +675,8 @@ void ProcessSession::commit()
_deletedFlowFiles.clear();
_originalFlowFiles.clear();
// persistent the provenance report
- this->_provenanceReport->commit();
+ if (this->_provenanceReport)
+ this->_provenanceReport->commit();
logger_->log_trace("ProcessSession committed for %s", _processContext->getProcessor()->getName().c_str());
}
catch (std::exception &exception)
@@ -740,7 +763,8 @@ FlowFileRecord *ProcessSession::get()
{
FlowFileRecord *record = *it;
std::string details = _processContext->getProcessor()->getName() + " expire flow record " + record->getUUIDStr();
- _provenanceReport->expire(record, details);
+ if (_provenanceReport)
+ _provenanceReport->expire(record, details);
delete (record);
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/Provenance.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Provenance.cpp b/libminifi/src/Provenance.cpp
index ff4d8f5..58cf730 100644
--- a/libminifi/src/Provenance.cpp
+++ b/libminifi/src/Provenance.cpp
@@ -23,9 +23,8 @@
#include "io/DataStream.h"
#include "io/Serializable.h"
#include "Provenance.h"
-
-#include "Logger.h"
#include "Relationship.h"
+#include "Logger.h"
#include "FlowController.h"
//! DeSerialize
@@ -227,9 +226,11 @@ bool ProvenanceEventRecord::Serialize(ProvenanceRepository *repo) {
if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
logger_->log_debug("NiFi Provenance Store event %s size %d success",
_eventIdStr.c_str(), outStream.getSize());
+ return true;
} else {
logger_->log_error("NiFi Provenance Store event %s size %d fail",
_eventIdStr.c_str(), outStream.getSize());
+ return false;
}
// cleanup
@@ -391,6 +392,8 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int bufferS
}
void ProvenanceReporter::commit() {
+ if (!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isEnable())
+ return;
for (auto event : _events) {
if (!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isFull()) {
event->Serialize(
@@ -561,68 +564,3 @@ void ProvenanceReporter::fetch(FlowFileRecord *flow, std::string transitUri,
}
}
-uint64_t ProvenanceRepository::_repoSize = 0;
-
-void ProvenanceRepository::start() {
- if (this->_purgePeriod <= 0)
- return;
- if (_running)
- return;
- _running = true;
- logger_->log_info("ProvenanceRepository Monitor Thread Start");
- _thread = new std::thread(run, this);
- _thread->detach();
-}
-
-void ProvenanceRepository::stop() {
- if (!_running)
- return;
- _running = false;
- logger_->log_info("ProvenanceRepository Monitor Thread Stop");
-}
-
-void ProvenanceRepository::run(ProvenanceRepository *repo) {
- // threshold for purge
- uint64_t purgeThreshold = repo->_maxPartitionBytes * 3 / 4;
- while (repo->_running) {
- std::this_thread::sleep_for(
- std::chrono::milliseconds(repo->_purgePeriod));
- uint64_t curTime = getTimeMillis();
- uint64_t size = repo->repoSize();
- if (size >= purgeThreshold) {
- std::vector<std::string> purgeList;
- leveldb::Iterator* it = repo->_db->NewIterator(
- leveldb::ReadOptions());
- for (it->SeekToFirst(); it->Valid(); it->Next()) {
- ProvenanceEventRecord eventRead;
- std::string key = it->key().ToString();
- if (eventRead.DeSerialize((uint8_t *) it->value().data(),
- (int) it->value().size())) {
- if ((curTime - eventRead.getEventTime())
- > repo->_maxPartitionMillis)
- purgeList.push_back(key);
- } else {
- repo->logger_->log_debug(
- "NiFi Provenance retrieve event %s fail",
- key.c_str());
- purgeList.push_back(key);
- }
- }
- delete it;
- std::vector<std::string>::iterator itPurge;
- for (itPurge = purgeList.begin(); itPurge != purgeList.end();
- itPurge++) {
- std::string eventId = *itPurge;
- repo->logger_->log_info("ProvenanceRepository Repo Purge %s",
- eventId.c_str());
- repo->Delete(eventId);
- }
- }
- if (size > repo->_maxPartitionBytes)
- repo->_repoFull = true;
- else
- repo->_repoFull = false;
- }
- return;
-}
-
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/Repository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Repository.cpp b/libminifi/src/Repository.cpp
new file mode 100644
index 0000000..ffbd953
--- /dev/null
+++ b/libminifi/src/Repository.cpp
@@ -0,0 +1,138 @@
+/**
+ * @file Repository.cpp
+ * Repository implemenatation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <cstdint>
+#include <vector>
+#include <arpa/inet.h>
+#include "io/DataStream.h"
+#include "io/Serializable.h"
+#include "Relationship.h"
+#include "Logger.h"
+#include "FlowController.h"
+#include "Repository.h"
+#include "Provenance.h"
+#include "FlowFileRepository.h"
+
+const char *Repository::RepositoryTypeStr[MAX_REPO_TYPE] = {"Provenace Repository", "FlowFile Repository"};
+uint64_t Repository::_repoSize[MAX_REPO_TYPE] = {0, 0};
+
+void Repository::start() {
+ if (!_enable)
+ return;
+ if (this->_purgePeriod <= 0)
+ return;
+ if (_running)
+ return;
+ _running = true;
+ logger_->log_info("%s Repository Monitor Thread Start", RepositoryTypeStr[_type]);
+ _thread = new std::thread(run, this);
+ _thread->detach();
+}
+
+void Repository::stop() {
+ if (!_running)
+ return;
+ _running = false;
+ logger_->log_info("%s Repository Monitor Thread Stop", RepositoryTypeStr[_type]);
+}
+
+void Repository::run(Repository *repo) {
+ // threshold for purge
+ uint64_t purgeThreshold = repo->_maxPartitionBytes * 3 / 4;
+ while (repo->_running) {
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(repo->_purgePeriod));
+ uint64_t curTime = getTimeMillis();
+ uint64_t size = repo->repoSize();
+ if (size >= purgeThreshold) {
+ std::vector<std::string> purgeList;
+ leveldb::Iterator* it = repo->_db->NewIterator(
+ leveldb::ReadOptions());
+ if (repo->_type == PROVENANCE)
+ {
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ ProvenanceEventRecord eventRead;
+ std::string key = it->key().ToString();
+ if (eventRead.DeSerialize((uint8_t *) it->value().data(),
+ (int) it->value().size())) {
+ if ((curTime - eventRead.getEventTime())
+ > repo->_maxPartitionMillis)
+ purgeList.push_back(key);
+ } else {
+ repo->logger_->log_debug(
+ "NiFi %s retrieve event %s fail",
+ RepositoryTypeStr[repo->_type],
+ key.c_str());
+ purgeList.push_back(key);
+ }
+ }
+ }
+ if (repo->_type == FLOWFILE)
+ {
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ FlowFileEventRecord eventRead;
+ std::string key = it->key().ToString();
+ if (eventRead.DeSerialize((uint8_t *) it->value().data(),
+ (int) it->value().size())) {
+ if ((curTime - eventRead.getEventTime())
+ > repo->_maxPartitionMillis)
+ purgeList.push_back(key);
+ } else {
+ repo->logger_->log_debug(
+ "NiFi %s retrieve event %s fail",
+ RepositoryTypeStr[repo->_type],
+ key.c_str());
+ purgeList.push_back(key);
+ }
+ }
+ }
+ delete it;
+ for (auto eventId : purgeList)
+ {
+ repo->logger_->log_info("Repository Repo %s Purge %s",
+ RepositoryTypeStr[repo->_type],
+ eventId.c_str());
+ repo->Delete(eventId);
+ }
+ }
+ if (size > repo->_maxPartitionBytes)
+ repo->_repoFull = true;
+ else
+ repo->_repoFull = false;
+ }
+ return;
+}
+
+//! repoSize
+uint64_t Repository::repoSize()
+{
+ _repoSize[_type] = 0;
+ if (_type == PROVENANCE)
+ {
+ if (ftw(_directory.c_str(), repoSumProvenance, 1) != 0)
+ _repoSize[_type] = 0;
+ }
+ if (_type == FLOWFILE)
+ {
+ if (ftw(_directory.c_str(), repoSumFlowFile, 1) != 0)
+ _repoSize[_type] = 0;
+ }
+ return _repoSize[_type];
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/ResourceClaim.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp
index ce1f248..be52b49 100644
--- a/libminifi/src/ResourceClaim.cpp
+++ b/libminifi/src/ResourceClaim.cpp
@@ -45,5 +45,5 @@ ResourceClaim::ResourceClaim(const std::string contentDirectory)
configure_ = Configure::getConfigure();
logger_ = Logger::getLogger();
- logger_->log_debug("Resource Claim created %s", _contentFullPath.c_str());
+ logger_->log_debug("Resource Claim created %s", uuidStr);
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/src/io/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index ebc2e44..39b71b4 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -331,7 +331,7 @@ int Socket::writeData(uint8_t *value, int size) {
}
if (ret)
- logger_->log_debug("Send data size %d over socket %d", size,
+ logger_->log_trace("Send data size %d over socket %d", size,
socket_file_descriptor_);
return bytes;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp
index 955d125..ae5f7e5 100644
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ b/libminifi/test/unit/ProcessorTests.cpp
@@ -20,6 +20,8 @@
#include "FlowController.h"
#include "ProvenanceTestHelper.h"
#include "../TestBase.h"
+#include <memory>
+#include "../../include/LogAppenders.h"
#include "GetFile.h"
@@ -33,10 +35,18 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
TestController testController;
- testController.enableDebug();
+ Configure *config = Configure::getConfigure();
- ProvenanceTestRepository repo;
- TestFlowController controller(repo);
+ config->set(BaseLogger::nifi_log_appender,"rollingappender");
+ config->set(OutputStreamAppender::nifi_log_output_stream_error_stderr,"true");
+ std::shared_ptr<Logger> logger = Logger::getLogger();
+ std::unique_ptr<BaseLogger> newLogger =LogInstance::getConfiguredLogger(config);
+ logger->updateLogger(std::move(newLogger));
+ logger->setLogLevel("debug");
+
+ ProvenanceTestRepository provenanceRepo;
+ FlowTestRepository flowRepo;
+ TestFlowController controller(provenanceRepo, flowRepo);
FlowControllerFactory::getFlowController( dynamic_cast<FlowController*>(&controller));
GetFile processor("getfileCreate2");
@@ -54,7 +64,7 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
// link the connections so that we can test results at the end for this
connection.setSourceProcessor(&processor);
-
+ connection.setDestinationProcessor(&processor);
connection.setSourceProcessorUUID(processoruuid);
connection.setDestinationProcessorUUID(processoruuid);
@@ -66,7 +76,6 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
context.setProperty(GetFile::Directory,dir);
ProcessSession session(&context);
-
REQUIRE( processor.getName() == "getfileCreate2");
FlowFileRecord *record;
@@ -82,13 +91,16 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
std::fstream file;
std::stringstream ss;
- ss << dir << "/" << "tstFile.ext";
+ std::string fileName("tstFile.ext");
+ ss << dir << "/" << fileName;
file.open(ss.str(),std::ios::out);
file << "tempFile";
+ int64_t fileSize = file.tellp();
file.close();
processor.incrementActiveTasks();
processor.setScheduledState(ScheduledState::RUNNING);
+
processor.onTrigger(&context,&session);
unlink(ss.str().c_str());
rmdir(dir);
@@ -103,6 +115,20 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
}
session.commit();
+ // verify flow file repo
+ REQUIRE( 1 == flowRepo.getRepoMap().size() );
+
+ for(auto entry: flowRepo.getRepoMap())
+ {
+ FlowFileEventRecord newRecord;
+ newRecord.DeSerialize((uint8_t*)entry.second.data(),entry.second.length());
+ REQUIRE (fileSize == newRecord.getFileSize());
+ REQUIRE (0 == newRecord.getFileOffset());
+ std::map<std::string, std::string> attrs = newRecord.getAttributes();
+ std::string key = FlowAttributeKey(FILENAME);
+ REQUIRE (attrs[key] == fileName);
+ }
+
FlowFileRecord *ffr = session.get();
ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
@@ -111,9 +137,9 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
std::set<FlowFileRecord*> expiredFlows;
- REQUIRE( 2 == repo.getRepoMap().size() );
+ REQUIRE( 2 == provenanceRepo.getRepoMap().size() );
- for(auto entry: repo.getRepoMap())
+ for(auto entry: provenanceRepo.getRepoMap())
{
ProvenanceEventRecord newRecord;
newRecord.DeSerialize((uint8_t*)entry.second.data(),entry.second.length());
@@ -134,9 +160,9 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
}
if (!found)
throw std::runtime_error("Did not find record");
+ }
- }
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c4fea0cd/libminifi/test/unit/ProvenanceTestHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index f67a826..1ee6a4c 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -20,6 +20,62 @@
#include "Provenance.h"
#include "FlowController.h"
+#include "FlowFileRepository.h"
+
+/**
+ * Test repository
+ */
+class FlowTestRepository : public FlowFileRepository
+{
+public:
+ FlowTestRepository()
+{
+}
+ //! initialize
+ bool initialize()
+ {
+ return true;
+ }
+
+ //! Destructor
+ virtual ~FlowTestRepository() {
+
+ }
+
+ bool Put(std::string key, uint8_t *buf, int bufLen)
+ {
+ repositoryResults.insert(std::pair<std::string,std::string>(key,std::string((const char*)buf,bufLen)));
+ return true;
+ }
+ //! Delete
+ bool Delete(std::string key)
+ {
+ repositoryResults.erase(key);
+ return true;
+ }
+ //! Get
+ bool Get(std::string key, std::string &value)
+ {
+ auto result = repositoryResults.find(key);
+ if (result != repositoryResults.end())
+ {
+ value = result->second;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ const std::map<std::string,std::string> &getRepoMap() const
+ {
+ return repositoryResults;
+ }
+
+protected:
+ std::map<std::string,std::string> repositoryResults;
+};
/**
* Test repository
@@ -81,9 +137,10 @@ class TestFlowController : public FlowController
{
public:
- TestFlowController(ProvenanceTestRepository &repo) : ::FlowController()
+ TestFlowController(ProvenanceTestRepository &provenanceRepo, FlowTestRepository &flowRepo) : ::FlowController()
{
- _provenanceRepo = dynamic_cast<ProvenanceRepository*>(&repo);
+ _provenanceRepo = dynamic_cast<ProvenanceRepository*>(&provenanceRepo);
+ _flowfileRepo = dynamic_cast<FlowFileRepository*>(&flowRepo);
}
~TestFlowController()
{