You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by benqiu2016 <gi...@git.apache.org> on 2017/03/02 04:46:59 UTC

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

GitHub user benqiu2016 opened a pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62

    MINIFI-231: Add Flow Persistent, Using id instead of name to load the\u2026

    \u2026 flow from YAML

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/benqiu2016/nifi-minifi-cpp FlowFile_Persistent

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi-minifi-cpp/pull/62.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #62
    
----
commit 4acbe65dc7a73bb720e01b91add18fc7d14897e6
Author: Bin Qiu <be...@gmail.com>
Date:   2017-03-02T04:45:16Z

    MINIFI-231: Add Flow Persistent, Using id instead of name to load the flow from YAML

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104687778
  
    --- Diff: libminifi/test/unit/ProvenanceTestHelper.h ---
    @@ -20,6 +20,62 @@
     
     #include "Provenance.h"
     #include "FlowController.h"
    +#include "FlowFileRepository.h"
    +
    +/**
    + * Test repository
    + */
    +class FlowTestRepository : public FlowFileRepository
    --- End diff --
    
    These are nearly Identical, why not just make one that inherits from Repository?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103955949
  
    --- Diff: libminifi/include/FlowFileRecord.h ---
    @@ -202,6 +216,8 @@ class FlowFileRecord
     	std::string _uuidStr;
     	//! UUID string for all parents
     	std::set<std::string> _lineageIdentifiers;
    +	//! whether it is stored to DB
    +	bool _isStoredToRepo;
    --- End diff --
    
    same with these variable names. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103957363
  
    --- Diff: libminifi/include/FlowFileRepository.h ---
    @@ -0,0 +1,208 @@
    +/**
    + * @file FlowFileRepository 
    + * Flow file repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __FLOWFILE_REPOSITORY_H__
    +#define __FLOWFILE_REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "Repository.h"
    +
    +class FlowFileRepository;
    +
    +//! FlowFile Event Record
    +class FlowFileEventRecord : protected Serializable
    +{
    +public:
    +	friend class ProcessSession;
    +public:
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance event record
    +	 */
    +	FlowFileEventRecord()
    +	: _entryDate(0), _lineageStartDate(0), _size(0), _offset(0)  
    +	{
    +		_eventTime = getTimeMillis();
    +		logger_ = Logger::getLogger();
    +	}
    +
    +	//! Destructor
    +	virtual ~FlowFileEventRecord() {
    +	}
    +	//! Get Attributes
    +	std::map<std::string, std::string> getAttributes() {
    +		return _attributes;
    +	}
    +	//! Get Size
    +	uint64_t getFileSize() {
    +		return _size;
    +	}
    +	// ! Get Offset
    +	uint64_t getFileOffset() {
    +		return _offset;
    +	}
    +	// ! Get Entry Date
    +	uint64_t getFlowFileEntryDate() {
    +		return _entryDate;
    +	}
    +	// ! Get Lineage Start Date
    +	uint64_t getlineageStartDate() {
    +		return _lineageStartDate;
    +	}
    +	// ! Get Event Time
    +	uint64_t getEventTime() {
    +		return _eventTime;
    +	}
    +	//! Get FlowFileUuid
    +	std::string getFlowFileUuid()
    +	{
    +		return _uuid;
    +	}
    +	//! Get ConnectionUuid
    +	std::string getConnectionUuid()
    +	{
    +		return _uuidConnection;
    +	}
    +	//! Get content full path
    +	std::string getContentFullPath()
    +	{
    +		return _contentFullPath;
    +	}
    +	//! Get LineageIdentifiers
    +	std::set<std::string> getLineageIdentifiers()
    +	{
    +		return _lineageIdentifiers;
    +	}
    +	//! fromFlowFile
    +	void fromFlowFile(FlowFileRecord *flow, std::string uuidConnection)
    +	{
    +		_entryDate = flow->getEntryDate();
    +		_lineageStartDate = flow->getlineageStartDate();
    +		_lineageIdentifiers = flow->getlineageIdentifiers();
    +		_uuid = flow->getUUIDStr();
    +		_attributes = flow->getAttributes();
    +		_size = flow->getSize();
    +		_offset = flow->getOffset();
    +		_uuidConnection = uuidConnection;
    +		if (flow->getResourceClaim())
    +		{
    +			_contentFullPath = flow->getResourceClaim()->getContentFullPath();
    +		}
    +	}
    +	//! Serialize and Persistent to the repository
    +	bool Serialize(FlowFileRepository *repo);
    +	//! DeSerialize
    +	bool DeSerialize(const uint8_t *buffer, const int bufferSize);
    +	//! DeSerialize
    +	bool DeSerialize(DataStream &stream)
    +	{
    +		return DeSerialize(stream.getBuffer(),stream.getSize());
    +	}
    +	//! DeSerialize
    +	bool DeSerialize(FlowFileRepository *repo, std::string key);
    +
    +protected:
    +
    +	//! Date at which the event was created
    +	uint64_t _eventTime;
    +	//! Date at which the flow file entered the flow
    +	uint64_t _entryDate;
    +	//! Date at which the origin of this flow file entered the flow
    +	uint64_t _lineageStartDate;
    +	//! Size in bytes of the data corresponding to this flow file
    +	uint64_t _size;
    +	//! flow uuid
    +	std::string _uuid;
    +	//! connection uuid
    +	std::string _uuidConnection;
    +	//! Offset to the content
    +	uint64_t _offset;
    +	//! Full path to the content
    +	std::string _contentFullPath;
    +	//! Attributes key/values pairs for the flow record
    +	std::map<std::string, std::string> _attributes;
    +	//! UUID string for all parents
    +	std::set<std::string> _lineageIdentifiers;
    +
    +private:
    +
    +	//! Logger
    +	std::shared_ptr<Logger> logger_;
    +	
    +	// Prevent default copy constructor and assignment operation
    +	// Only support pass by reference or pointer
    +	FlowFileEventRecord(const FlowFileEventRecord &parent);
    +	FlowFileEventRecord &operator=(const FlowFileEventRecord &parent);
    +
    +};
    +
    +#define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository"
    +#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M
    +#define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute
    +#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2500) // 2500 msec
    +
    +//! FlowFile Repository
    +class FlowFileRepository : public Repository
    +{
    +public:
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	FlowFileRepository()
    --- End diff --
    
    Why are arguments not passed into an explicit constructor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103955742
  
    --- Diff: libminifi/include/Connection.h ---
    @@ -180,7 +184,8 @@ class Connection
     	std::atomic<uint64_t> _maxQueueDataSize;
     	//! Flow File Expiration Duration in= MilliSeconds
     	std::atomic<uint64_t> _expiredDuration;
    -
    +	//! UUID string
    +	std::string _uuidStr;
    --- End diff --
    
    as per Google's code style this would be uuid_str_


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp issue #62: MINIFI-231: Add Flow Persistent, Using id instead...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on the issue:

    https://github.com/apache/nifi-minifi-cpp/pull/62
  
    @phrocker Add unitest, please review and approve. I would like to get it merge before your big name space change. Thanks a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp issue #62: MINIFI-231: Add Flow Persistent, Using id instead...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the issue:

    https://github.com/apache/nifi-minifi-cpp/pull/62
  
    No substantial comments beyond those already covered.  Looks good and verified functionality.  Will merge, thanks for the work, @benqiu2016!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103956804
  
    --- Diff: libminifi/include/FlowFileRecord.h ---
    @@ -108,6 +109,10 @@ class FlowFileRecord
     	 * Create a new flow record
     	 */
     	FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim = NULL);
    --- End diff --
    
    Why not explicit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104301026
  
    --- Diff: libminifi/include/FlowFileRecord.h ---
    @@ -108,6 +109,10 @@ class FlowFileRecord
     	 * Create a new flow record
     	 */
     	FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim = NULL);
    +	/*!
    +	 * Create a new flow record from repo flow event
    +	 */
    +	FlowFileRecord(FlowFileEventRecord *event);
    --- End diff --
    
    will use explicit, i am not using smart pointer here is that if you look at where we construct FlowFileRecord from FlowFileEventRecord, i am using a FlowFileEventRecord in the stack to. I want to avoid to create a new FlowFileEventRecord every time. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104301956
  
    --- Diff: libminifi/include/FlowFileRepository.h ---
    @@ -0,0 +1,208 @@
    +/**
    + * @file FlowFileRepository 
    + * Flow file repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __FLOWFILE_REPOSITORY_H__
    +#define __FLOWFILE_REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "Repository.h"
    +
    +class FlowFileRepository;
    +
    +//! FlowFile Event Record
    +class FlowFileEventRecord : protected Serializable
    +{
    +public:
    +	friend class ProcessSession;
    +public:
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance event record
    +	 */
    +	FlowFileEventRecord()
    +	: _entryDate(0), _lineageStartDate(0), _size(0), _offset(0)  
    +	{
    +		_eventTime = getTimeMillis();
    +		logger_ = Logger::getLogger();
    +	}
    +
    +	//! Destructor
    +	virtual ~FlowFileEventRecord() {
    +	}
    +	//! Get Attributes
    +	std::map<std::string, std::string> getAttributes() {
    --- End diff --
    
    if we call getAttributes and somehow the event which holding that attributes was deleted, we will have trouble.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104319949
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    +{
    +public:
    +	enum RepositoryType {
    +		//! Provenance Repo Type
    +		PROVENANCE,
    +		//! FlowFile Repo Type
    +		FLOWFILE,
    +		MAX_REPO_TYPE
    +	};
    +	static const char *RepositoryTypeStr[MAX_REPO_TYPE];
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	Repository(RepositoryType type, std::string directory, 
    +		int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
    +		_type = type;
    +		_directory = directory;
    +		_maxPartitionMillis = maxPartitionMillis;
    +		_maxPartitionBytes = maxPartitionBytes;
    +		_purgePeriod = purgePeriod;
    +		logger_ = Logger::getLogger();
    +		configure_ = Configure::getConfigure();
    +		_db = NULL;
    +		_thread = NULL;
    +		_running = false;
    +		_repoFull = false;
    +		_enable = true;
    +	}
    +
    +	//! Destructor
    +	virtual ~Repository() {
    +		stop();
    +		if (this->_thread)
    +			delete this->_thread;
    +		destroy();
    +	}
    +
    +	//! initialize
    +	virtual bool initialize()
    +	{
    +		std::string value;
    +
    +		if (_type == PROVENANCE)
    +		{
    +			if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		if (_type == FLOWFILE)
    +		{
    +			if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		return true;
    +	}
    +	//! Put
    +	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
    +	{
    +		if (!_enable)
    +			return false;
    +			
    +		// persistent to the DB
    +		leveldb::Slice value((const char *) buf, bufLen);
    +		leveldb::Status status;
    +		status = _db->Put(leveldb::WriteOptions(), key, value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Delete
    +	virtual bool Delete(std::string key)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Delete(leveldb::WriteOptions(), key);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Get
    +	virtual bool Get(std::string key, std::string &value)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Get(leveldb::ReadOptions(), key, &value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! destroy
    +	void destroy()
    +	{
    +		if (_db)
    +		{
    +			delete _db;
    +			_db = NULL;
    +		}
    +	}
    +	//! Run function for the thread
    +	static void run(Repository *repo);
    +	//! Start the repository monitor thread
    +	virtual void start();
    +	//! Stop the repository monitor thread
    +	virtual void stop();
    +	//! whether the repo is full
    +	virtual bool isFull()
    +	{
    +		return _repoFull;
    +	}
    +	//! whether the repo is enable 
    +	virtual bool isEnable()
    +	{
    +		return _enable;
    +	}
    +
    +protected:
    +	//! Repo Type
    +	RepositoryType _type;
    +	//! Mutex for protection
    +	std::mutex _mtx;
    +	//! repository directory
    +	std::string _directory;
    +	//! Logger
    +	std::shared_ptr<Logger> logger_;
    +	//! Configure
    +	//! max db entry life time
    +	Configure *configure_;
    +	int64_t _maxPartitionMillis;
    +	//! max db size
    +	int64_t _maxPartitionBytes;
    +	//! purge period
    +	uint64_t _purgePeriod;
    +	//! level DB database
    +	leveldb::DB* _db;
    +	//! thread
    +	std::thread *_thread;
    --- End diff --
    
    just to keep code persistent with other way to do the std::thread in ThreadSchedulingAgent and make sure that we start the thread not on constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103960265
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    +{
    +public:
    +	enum RepositoryType {
    +		//! Provenance Repo Type
    +		PROVENANCE,
    +		//! FlowFile Repo Type
    +		FLOWFILE,
    +		MAX_REPO_TYPE
    +	};
    +	static const char *RepositoryTypeStr[MAX_REPO_TYPE];
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	Repository(RepositoryType type, std::string directory, 
    +		int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
    +		_type = type;
    +		_directory = directory;
    +		_maxPartitionMillis = maxPartitionMillis;
    +		_maxPartitionBytes = maxPartitionBytes;
    +		_purgePeriod = purgePeriod;
    +		logger_ = Logger::getLogger();
    +		configure_ = Configure::getConfigure();
    +		_db = NULL;
    +		_thread = NULL;
    +		_running = false;
    +		_repoFull = false;
    +		_enable = true;
    +	}
    +
    +	//! Destructor
    +	virtual ~Repository() {
    +		stop();
    +		if (this->_thread)
    +			delete this->_thread;
    +		destroy();
    +	}
    +
    +	//! initialize
    +	virtual bool initialize()
    +	{
    +		std::string value;
    +
    +		if (_type == PROVENANCE)
    +		{
    +			if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		if (_type == FLOWFILE)
    +		{
    +			if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		return true;
    +	}
    +	//! Put
    +	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
    +	{
    +		if (!_enable)
    +			return false;
    +			
    +		// persistent to the DB
    +		leveldb::Slice value((const char *) buf, bufLen);
    +		leveldb::Status status;
    +		status = _db->Put(leveldb::WriteOptions(), key, value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Delete
    +	virtual bool Delete(std::string key)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Delete(leveldb::WriteOptions(), key);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Get
    +	virtual bool Get(std::string key, std::string &value)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Get(leveldb::ReadOptions(), key, &value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! destroy
    +	void destroy()
    +	{
    +		if (_db)
    +		{
    +			delete _db;
    +			_db = NULL;
    +		}
    +	}
    +	//! Run function for the thread
    +	static void run(Repository *repo);
    +	//! Start the repository monitor thread
    +	virtual void start();
    +	//! Stop the repository monitor thread
    +	virtual void stop();
    +	//! whether the repo is full
    +	virtual bool isFull()
    +	{
    +		return _repoFull;
    +	}
    +	//! whether the repo is enable 
    +	virtual bool isEnable()
    +	{
    +		return _enable;
    +	}
    +
    +protected:
    +	//! Repo Type
    +	RepositoryType _type;
    +	//! Mutex for protection
    +	std::mutex _mtx;
    +	//! repository directory
    +	std::string _directory;
    +	//! Logger
    +	std::shared_ptr<Logger> logger_;
    +	//! Configure
    +	//! max db entry life time
    +	Configure *configure_;
    +	int64_t _maxPartitionMillis;
    +	//! max db size
    +	int64_t _maxPartitionBytes;
    +	//! purge period
    +	uint64_t _purgePeriod;
    +	//! level DB database
    +	leveldb::DB* _db;
    +	//! thread
    +	std::thread *_thread;
    +	//! whether it is running
    +	bool _running;
    +	//! whether it is enable 
    +	bool _enable;
    +	//! whether stop accepting provenace event
    +	std::atomic<bool> _repoFull;
    +	//! repoSize
    +	uint64_t repoSize();
    +	//! size of the directory
    +	static uint64_t _repoSize[MAX_REPO_TYPE];
    +	//! call back for directory size
    +	static int repoSumProvenance(const char *fpath, const struct stat *sb, int typeflag)
    +	{
    +		_repoSize[PROVENANCE] += sb->st_size;
    --- End diff --
    
    Is this state that can be maintained in metadata of the repo versus directory?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103956152
  
    --- Diff: libminifi/include/FlowFileRepository.h ---
    @@ -0,0 +1,208 @@
    +/**
    + * @file FlowFileRepository 
    + * Flow file repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __FLOWFILE_REPOSITORY_H__
    +#define __FLOWFILE_REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "Repository.h"
    +
    +class FlowFileRepository;
    +
    +//! FlowFile Event Record
    +class FlowFileEventRecord : protected Serializable
    +{
    +public:
    +	friend class ProcessSession;
    --- End diff --
    
    These classes don't appear to be related, why need to access its private members?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104183246
  
    --- Diff: libminifi/include/Connection.h ---
    @@ -180,7 +184,8 @@ class Connection
     	std::atomic<uint64_t> _maxQueueDataSize;
     	//! Flow File Expiration Duration in= MilliSeconds
     	std::atomic<uint64_t> _expiredDuration;
    -
    +	//! UUID string
    +	std::string _uuidStr;
    --- End diff --
    
    as i was going through it again I wanted to make clear that this is informational and not a huge deal. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp issue #62: MINIFI-231: Add Flow Persistent, Using id instead...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on the issue:

    https://github.com/apache/nifi-minifi-cpp/pull/62
  
    I'll continue to go through it. I made a very cursory look. Good changes overall!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103956083
  
    --- Diff: libminifi/include/FlowFileRepository.h ---
    @@ -0,0 +1,208 @@
    +/**
    + * @file FlowFileRepository 
    + * Flow file repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __FLOWFILE_REPOSITORY_H__
    +#define __FLOWFILE_REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "Repository.h"
    +
    +class FlowFileRepository;
    --- End diff --
    
    Avoid forward declarations per the Google code style. This will cause issues with namespaces. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104318207
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    +{
    +public:
    +	enum RepositoryType {
    +		//! Provenance Repo Type
    +		PROVENANCE,
    +		//! FlowFile Repo Type
    +		FLOWFILE,
    +		MAX_REPO_TYPE
    +	};
    +	static const char *RepositoryTypeStr[MAX_REPO_TYPE];
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	Repository(RepositoryType type, std::string directory, 
    +		int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
    +		_type = type;
    +		_directory = directory;
    +		_maxPartitionMillis = maxPartitionMillis;
    +		_maxPartitionBytes = maxPartitionBytes;
    +		_purgePeriod = purgePeriod;
    +		logger_ = Logger::getLogger();
    +		configure_ = Configure::getConfigure();
    +		_db = NULL;
    +		_thread = NULL;
    +		_running = false;
    +		_repoFull = false;
    +		_enable = true;
    +	}
    +
    +	//! Destructor
    +	virtual ~Repository() {
    +		stop();
    +		if (this->_thread)
    +			delete this->_thread;
    +		destroy();
    +	}
    +
    +	//! initialize
    +	virtual bool initialize()
    +	{
    +		std::string value;
    +
    +		if (_type == PROVENANCE)
    +		{
    +			if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		if (_type == FLOWFILE)
    +		{
    +			if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		return true;
    +	}
    +	//! Put
    +	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
    +	{
    +		if (!_enable)
    +			return false;
    +			
    +		// persistent to the DB
    +		leveldb::Slice value((const char *) buf, bufLen);
    +		leveldb::Status status;
    +		status = _db->Put(leveldb::WriteOptions(), key, value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Delete
    +	virtual bool Delete(std::string key)
    --- End diff --
    
    sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104433184
  
    --- Diff: libminifi/src/Repository.cpp ---
    @@ -0,0 +1,140 @@
    +/**
    + * @file Repository.cpp
    + * Repository implemenatation 
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include <cstdint>
    +#include <vector>
    +#include <arpa/inet.h>
    +#include "io/DataStream.h"
    +#include "io/Serializable.h"
    +#include "Relationship.h"
    +#include "Logger.h"
    +#include "FlowController.h"
    +#include "Repository.h"
    +#include "Provenance.h"
    +#include "FlowFileRepository.h"
    +
    +const char *Repository::RepositoryTypeStr[MAX_REPO_TYPE] = {"Provenace Repository", "FlowFile Repository"};
    +uint64_t Repository::_repoSize[MAX_REPO_TYPE] = {0, 0}; 
    +
    +void Repository::start() {
    +	if (!_enable)
    +		return;
    +	if (this->_purgePeriod <= 0)
    +		return;
    +	if (_running)
    +		return;
    +	_running = true;
    +	logger_->log_info("%s Repository Monitor Thread Start", RepositoryTypeStr[_type]);
    +	_thread = new std::thread(run, this);
    +	_thread->detach();
    +}
    +
    +void Repository::stop() {
    +	if (!_running)
    +		return;
    +	_running = false;
    +	logger_->log_info("%s Repository Monitor Thread Stop", RepositoryTypeStr[_type]);
    +}
    +
    +void Repository::run(Repository *repo) {
    +	// threshold for purge
    --- End diff --
    
    I think a comment should be placed to reflect this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104827593
  
    --- Diff: libminifi/test/unit/ProvenanceTestHelper.h ---
    @@ -20,6 +20,62 @@
     
     #include "Provenance.h"
     #include "FlowController.h"
    +#include "FlowFileRepository.h"
    +
    +/**
    + * Test repository
    + */
    +class FlowTestRepository : public FlowFileRepository
    --- End diff --
    
    the FlowFileRepo construct to populate the default repo type and parameters for the repo 
    /*!
    	 * Create a new provenance repository
    	 */
    	FlowFileRepository()
    	 : Repository(Repository::FLOWFILE, FLOWFILE_REPOSITORY_DIRECTORY,
    			MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, FLOWFILE_REPOSITORY_PURGE_PERIOD)
    	{
    	}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104301097
  
    --- Diff: libminifi/include/FlowFileRepository.h ---
    @@ -0,0 +1,208 @@
    +/**
    + * @file FlowFileRepository 
    + * Flow file repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __FLOWFILE_REPOSITORY_H__
    +#define __FLOWFILE_REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "Repository.h"
    +
    +class FlowFileRepository;
    --- End diff --
    
    will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104320062
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    +{
    +public:
    +	enum RepositoryType {
    +		//! Provenance Repo Type
    +		PROVENANCE,
    +		//! FlowFile Repo Type
    +		FLOWFILE,
    +		MAX_REPO_TYPE
    +	};
    +	static const char *RepositoryTypeStr[MAX_REPO_TYPE];
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	Repository(RepositoryType type, std::string directory, 
    +		int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
    +		_type = type;
    +		_directory = directory;
    +		_maxPartitionMillis = maxPartitionMillis;
    +		_maxPartitionBytes = maxPartitionBytes;
    +		_purgePeriod = purgePeriod;
    +		logger_ = Logger::getLogger();
    +		configure_ = Configure::getConfigure();
    +		_db = NULL;
    +		_thread = NULL;
    +		_running = false;
    +		_repoFull = false;
    +		_enable = true;
    +	}
    +
    +	//! Destructor
    +	virtual ~Repository() {
    +		stop();
    +		if (this->_thread)
    +			delete this->_thread;
    +		destroy();
    +	}
    +
    +	//! initialize
    +	virtual bool initialize()
    +	{
    +		std::string value;
    +
    +		if (_type == PROVENANCE)
    +		{
    +			if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		if (_type == FLOWFILE)
    +		{
    +			if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		return true;
    +	}
    +	//! Put
    +	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
    +	{
    +		if (!_enable)
    +			return false;
    +			
    +		// persistent to the DB
    +		leveldb::Slice value((const char *) buf, bufLen);
    +		leveldb::Status status;
    +		status = _db->Put(leveldb::WriteOptions(), key, value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Delete
    +	virtual bool Delete(std::string key)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Delete(leveldb::WriteOptions(), key);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Get
    +	virtual bool Get(std::string key, std::string &value)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Get(leveldb::ReadOptions(), key, &value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! destroy
    +	void destroy()
    +	{
    +		if (_db)
    +		{
    +			delete _db;
    +			_db = NULL;
    +		}
    +	}
    +	//! Run function for the thread
    +	static void run(Repository *repo);
    +	//! Start the repository monitor thread
    +	virtual void start();
    +	//! Stop the repository monitor thread
    +	virtual void stop();
    +	//! whether the repo is full
    +	virtual bool isFull()
    +	{
    +		return _repoFull;
    +	}
    +	//! whether the repo is enable 
    +	virtual bool isEnable()
    +	{
    +		return _enable;
    +	}
    +
    +protected:
    +	//! Repo Type
    +	RepositoryType _type;
    +	//! Mutex for protection
    +	std::mutex _mtx;
    +	//! repository directory
    +	std::string _directory;
    +	//! Logger
    +	std::shared_ptr<Logger> logger_;
    +	//! Configure
    +	//! max db entry life time
    +	Configure *configure_;
    +	int64_t _maxPartitionMillis;
    +	//! max db size
    +	int64_t _maxPartitionBytes;
    +	//! purge period
    +	uint64_t _purgePeriod;
    +	//! level DB database
    +	leveldb::DB* _db;
    +	//! thread
    +	std::thread *_thread;
    +	//! whether it is running
    +	bool _running;
    +	//! whether it is enable 
    +	bool _enable;
    +	//! whether stop accepting provenace event
    +	std::atomic<bool> _repoFull;
    +	//! repoSize
    +	uint64_t repoSize();
    +	//! size of the directory
    +	static uint64_t _repoSize[MAX_REPO_TYPE];
    +	//! call back for directory size
    +	static int repoSumProvenance(const char *fpath, const struct stat *sb, int typeflag)
    +	{
    +		_repoSize[PROVENANCE] += sb->st_size;
    --- End diff --
    
    LevelDB does not expose a API to check DB size, also the levelDB has log and extra file when we commit the record, it was better to check the size of the directory to get the whole size.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103957227
  
    --- Diff: libminifi/include/FlowFileRepository.h ---
    @@ -0,0 +1,208 @@
    +/**
    + * @file FlowFileRepository 
    + * Flow file repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __FLOWFILE_REPOSITORY_H__
    +#define __FLOWFILE_REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "Repository.h"
    +
    +class FlowFileRepository;
    +
    +//! FlowFile Event Record
    +class FlowFileEventRecord : protected Serializable
    +{
    +public:
    +	friend class ProcessSession;
    +public:
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance event record
    +	 */
    +	FlowFileEventRecord()
    +	: _entryDate(0), _lineageStartDate(0), _size(0), _offset(0)  
    +	{
    +		_eventTime = getTimeMillis();
    +		logger_ = Logger::getLogger();
    +	}
    +
    +	//! Destructor
    +	virtual ~FlowFileEventRecord() {
    +	}
    +	//! Get Attributes
    +	std::map<std::string, std::string> getAttributes() {
    +		return _attributes;
    +	}
    +	//! Get Size
    +	uint64_t getFileSize() {
    +		return _size;
    +	}
    +	// ! Get Offset
    +	uint64_t getFileOffset() {
    +		return _offset;
    +	}
    +	// ! Get Entry Date
    +	uint64_t getFlowFileEntryDate() {
    +		return _entryDate;
    +	}
    +	// ! Get Lineage Start Date
    +	uint64_t getlineageStartDate() {
    +		return _lineageStartDate;
    +	}
    +	// ! Get Event Time
    +	uint64_t getEventTime() {
    +		return _eventTime;
    +	}
    +	//! Get FlowFileUuid
    +	std::string getFlowFileUuid()
    +	{
    +		return _uuid;
    +	}
    +	//! Get ConnectionUuid
    +	std::string getConnectionUuid()
    +	{
    +		return _uuidConnection;
    +	}
    +	//! Get content full path
    +	std::string getContentFullPath()
    +	{
    +		return _contentFullPath;
    +	}
    +	//! Get LineageIdentifiers
    +	std::set<std::string> getLineageIdentifiers()
    +	{
    +		return _lineageIdentifiers;
    +	}
    +	//! fromFlowFile
    +	void fromFlowFile(FlowFileRecord *flow, std::string uuidConnection)
    +	{
    +		_entryDate = flow->getEntryDate();
    +		_lineageStartDate = flow->getlineageStartDate();
    +		_lineageIdentifiers = flow->getlineageIdentifiers();
    +		_uuid = flow->getUUIDStr();
    +		_attributes = flow->getAttributes();
    +		_size = flow->getSize();
    +		_offset = flow->getOffset();
    +		_uuidConnection = uuidConnection;
    +		if (flow->getResourceClaim())
    +		{
    +			_contentFullPath = flow->getResourceClaim()->getContentFullPath();
    +		}
    +	}
    +	//! Serialize and Persistent to the repository
    +	bool Serialize(FlowFileRepository *repo);
    +	//! DeSerialize
    +	bool DeSerialize(const uint8_t *buffer, const int bufferSize);
    +	//! DeSerialize
    +	bool DeSerialize(DataStream &stream)
    +	{
    +		return DeSerialize(stream.getBuffer(),stream.getSize());
    +	}
    +	//! DeSerialize
    +	bool DeSerialize(FlowFileRepository *repo, std::string key);
    +
    +protected:
    +
    +	//! Date at which the event was created
    --- End diff --
    
    variables should be event_time_ per google's code style. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104320103
  
    --- Diff: libminifi/src/FlowFileRecord.cpp ---
    @@ -74,6 +76,43 @@ FlowFileRecord::FlowFileRecord(std::map<std::string, std::string> attributes, Re
     	logger_ = Logger::getLogger();
     }
     
    +FlowFileRecord::FlowFileRecord(FlowFileEventRecord *event)
    +: _size(0),
    +  _id(_localFlowSeqNumber.load()),
    +  _offset(0),
    +  _penaltyExpirationMs(0),
    +  _claim(NULL),
    +  _isStoredToRepo(false),
    +  _markedDelete(false),
    +  _connection(NULL),
    +  _orginalConnection(NULL)
    +{
    +	_entryDate = event->getFlowFileEntryDate();
    +	_lineageStartDate = event->getlineageStartDate();
    +	_size = event->getFileSize();
    +	_offset = event->getFileOffset();
    +	_lineageIdentifiers = event->getLineageIdentifiers();
    +	_attributes = event->getAttributes();
    +    _snapshot = false;
    +    _uuidStr = event->getFlowFileUuid();
    +    uuid_parse(_uuidStr.c_str(), _uuid);
    +
    +    if (_size > 0)
    +    {
    +    	_claim = new ResourceClaim();
    --- End diff --
    
    the claim was pass around while flow was dup or clone, etc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104433421
  
    --- Diff: libminifi/include/Connection.h ---
    @@ -180,7 +184,8 @@ class Connection
     	std::atomic<uint64_t> _maxQueueDataSize;
     	//! Flow File Expiration Duration in= MilliSeconds
     	std::atomic<uint64_t> _expiredDuration;
    -
    +	//! UUID string
    +	std::string _uuidStr;
    --- End diff --
    
    Again this isn't a big deal but will be changed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103960165
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    +{
    +public:
    +	enum RepositoryType {
    +		//! Provenance Repo Type
    +		PROVENANCE,
    +		//! FlowFile Repo Type
    +		FLOWFILE,
    +		MAX_REPO_TYPE
    +	};
    +	static const char *RepositoryTypeStr[MAX_REPO_TYPE];
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	Repository(RepositoryType type, std::string directory, 
    +		int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
    +		_type = type;
    +		_directory = directory;
    +		_maxPartitionMillis = maxPartitionMillis;
    +		_maxPartitionBytes = maxPartitionBytes;
    +		_purgePeriod = purgePeriod;
    +		logger_ = Logger::getLogger();
    +		configure_ = Configure::getConfigure();
    +		_db = NULL;
    +		_thread = NULL;
    +		_running = false;
    +		_repoFull = false;
    +		_enable = true;
    +	}
    +
    +	//! Destructor
    +	virtual ~Repository() {
    +		stop();
    +		if (this->_thread)
    +			delete this->_thread;
    +		destroy();
    +	}
    +
    +	//! initialize
    +	virtual bool initialize()
    +	{
    +		std::string value;
    +
    +		if (_type == PROVENANCE)
    +		{
    +			if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		if (_type == FLOWFILE)
    +		{
    +			if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		return true;
    +	}
    +	//! Put
    +	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
    +	{
    +		if (!_enable)
    +			return false;
    +			
    +		// persistent to the DB
    +		leveldb::Slice value((const char *) buf, bufLen);
    +		leveldb::Status status;
    +		status = _db->Put(leveldb::WriteOptions(), key, value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Delete
    +	virtual bool Delete(std::string key)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Delete(leveldb::WriteOptions(), key);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Get
    +	virtual bool Get(std::string key, std::string &value)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Get(leveldb::ReadOptions(), key, &value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! destroy
    +	void destroy()
    +	{
    +		if (_db)
    +		{
    +			delete _db;
    +			_db = NULL;
    +		}
    +	}
    +	//! Run function for the thread
    +	static void run(Repository *repo);
    +	//! Start the repository monitor thread
    +	virtual void start();
    +	//! Stop the repository monitor thread
    +	virtual void stop();
    +	//! whether the repo is full
    +	virtual bool isFull()
    +	{
    +		return _repoFull;
    +	}
    +	//! whether the repo is enable 
    +	virtual bool isEnable()
    +	{
    +		return _enable;
    +	}
    +
    +protected:
    +	//! Repo Type
    +	RepositoryType _type;
    +	//! Mutex for protection
    +	std::mutex _mtx;
    +	//! repository directory
    +	std::string _directory;
    +	//! Logger
    +	std::shared_ptr<Logger> logger_;
    +	//! Configure
    +	//! max db entry life time
    +	Configure *configure_;
    +	int64_t _maxPartitionMillis;
    +	//! max db size
    +	int64_t _maxPartitionBytes;
    +	//! purge period
    +	uint64_t _purgePeriod;
    +	//! level DB database
    +	leveldb::DB* _db;
    +	//! thread
    +	std::thread *_thread;
    +	//! whether it is running
    +	bool _running;
    +	//! whether it is enable 
    +	bool _enable;
    +	//! whether stop accepting provenace event
    +	std::atomic<bool> _repoFull;
    +	//! repoSize
    +	uint64_t repoSize();
    +	//! size of the directory
    +	static uint64_t _repoSize[MAX_REPO_TYPE];
    --- End diff --
    
    Can this be configuration based? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103961349
  
    --- Diff: libminifi/src/Repository.cpp ---
    @@ -0,0 +1,140 @@
    +/**
    + * @file Repository.cpp
    + * Repository implemenatation 
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include <cstdint>
    +#include <vector>
    +#include <arpa/inet.h>
    +#include "io/DataStream.h"
    +#include "io/Serializable.h"
    +#include "Relationship.h"
    +#include "Logger.h"
    +#include "FlowController.h"
    +#include "Repository.h"
    +#include "Provenance.h"
    +#include "FlowFileRepository.h"
    +
    +const char *Repository::RepositoryTypeStr[MAX_REPO_TYPE] = {"Provenace Repository", "FlowFile Repository"};
    +uint64_t Repository::_repoSize[MAX_REPO_TYPE] = {0, 0}; 
    +
    +void Repository::start() {
    +	if (!_enable)
    +		return;
    +	if (this->_purgePeriod <= 0)
    +		return;
    +	if (_running)
    +		return;
    +	_running = true;
    +	logger_->log_info("%s Repository Monitor Thread Start", RepositoryTypeStr[_type]);
    +	_thread = new std::thread(run, this);
    +	_thread->detach();
    +}
    +
    +void Repository::stop() {
    +	if (!_running)
    +		return;
    +	_running = false;
    +	logger_->log_info("%s Repository Monitor Thread Stop", RepositoryTypeStr[_type]);
    +}
    +
    +void Repository::run(Repository *repo) {
    +	// threshold for purge
    --- End diff --
    
    What is this calculation based on? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103958256
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    +{
    +public:
    +	enum RepositoryType {
    +		//! Provenance Repo Type
    +		PROVENANCE,
    +		//! FlowFile Repo Type
    +		FLOWFILE,
    +		MAX_REPO_TYPE
    +	};
    +	static const char *RepositoryTypeStr[MAX_REPO_TYPE];
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	Repository(RepositoryType type, std::string directory, 
    +		int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
    +		_type = type;
    +		_directory = directory;
    +		_maxPartitionMillis = maxPartitionMillis;
    +		_maxPartitionBytes = maxPartitionBytes;
    +		_purgePeriod = purgePeriod;
    +		logger_ = Logger::getLogger();
    +		configure_ = Configure::getConfigure();
    +		_db = NULL;
    +		_thread = NULL;
    +		_running = false;
    +		_repoFull = false;
    +		_enable = true;
    +	}
    +
    +	//! Destructor
    +	virtual ~Repository() {
    +		stop();
    +		if (this->_thread)
    +			delete this->_thread;
    +		destroy();
    +	}
    +
    +	//! initialize
    +	virtual bool initialize()
    +	{
    +		std::string value;
    +
    +		if (_type == PROVENANCE)
    +		{
    +			if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		if (_type == FLOWFILE)
    +		{
    +			if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		return true;
    +	}
    +	//! Put
    +	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
    +	{
    +		if (!_enable)
    --- End diff --
    
    Why is _enable ( should be enable_ ) not atomic? Would this not cause invalid state if we called put then delete and another thread disabled this? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103958761
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    +{
    +public:
    +	enum RepositoryType {
    +		//! Provenance Repo Type
    +		PROVENANCE,
    +		//! FlowFile Repo Type
    +		FLOWFILE,
    +		MAX_REPO_TYPE
    +	};
    +	static const char *RepositoryTypeStr[MAX_REPO_TYPE];
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	Repository(RepositoryType type, std::string directory, 
    +		int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
    +		_type = type;
    +		_directory = directory;
    +		_maxPartitionMillis = maxPartitionMillis;
    +		_maxPartitionBytes = maxPartitionBytes;
    +		_purgePeriod = purgePeriod;
    +		logger_ = Logger::getLogger();
    +		configure_ = Configure::getConfigure();
    +		_db = NULL;
    +		_thread = NULL;
    +		_running = false;
    +		_repoFull = false;
    +		_enable = true;
    +	}
    +
    +	//! Destructor
    +	virtual ~Repository() {
    +		stop();
    +		if (this->_thread)
    +			delete this->_thread;
    +		destroy();
    +	}
    +
    +	//! initialize
    +	virtual bool initialize()
    +	{
    +		std::string value;
    +
    +		if (_type == PROVENANCE)
    +		{
    +			if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		if (_type == FLOWFILE)
    +		{
    +			if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		return true;
    +	}
    +	//! Put
    +	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
    +	{
    +		if (!_enable)
    +			return false;
    +			
    +		// persistent to the DB
    +		leveldb::Slice value((const char *) buf, bufLen);
    +		leveldb::Status status;
    +		status = _db->Put(leveldb::WriteOptions(), key, value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Delete
    +	virtual bool Delete(std::string key)
    --- End diff --
    
    const std::string &key ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104827822
  
    --- Diff: libminifi/test/unit/ProcessorTests.cpp ---
    @@ -33,10 +35,18 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
     
     	TestController testController;
     
    -	testController.enableDebug();
    +	Configure *config = Configure::getConfigure();
     
    -	ProvenanceTestRepository repo;
    -	TestFlowController controller(repo);
    +	config->set(BaseLogger::nifi_log_appender,"rollingappender");
    --- End diff --
    
    sure. we can remove that. For my last test, i found test is failed and have trouble to find the log for the test that just ran. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp issue #62: MINIFI-231: Add Flow Persistent, Using id instead...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on the issue:

    https://github.com/apache/nifi-minifi-cpp/pull/62
  
    @apiri could you please merge the PR once you have free time. No rush. But it would be good if it can get in before Marc name space change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104320291
  
    --- Diff: libminifi/src/Repository.cpp ---
    @@ -0,0 +1,140 @@
    +/**
    + * @file Repository.cpp
    + * Repository implemenatation 
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include <cstdint>
    +#include <vector>
    +#include <arpa/inet.h>
    +#include "io/DataStream.h"
    +#include "io/Serializable.h"
    +#include "Relationship.h"
    +#include "Logger.h"
    +#include "FlowController.h"
    +#include "Repository.h"
    +#include "Provenance.h"
    +#include "FlowFileRepository.h"
    +
    +const char *Repository::RepositoryTypeStr[MAX_REPO_TYPE] = {"Provenace Repository", "FlowFile Repository"};
    +uint64_t Repository::_repoSize[MAX_REPO_TYPE] = {0, 0}; 
    +
    +void Repository::start() {
    +	if (!_enable)
    +		return;
    +	if (this->_purgePeriod <= 0)
    +		return;
    +	if (_running)
    +		return;
    +	_running = true;
    +	logger_->log_info("%s Repository Monitor Thread Start", RepositoryTypeStr[_type]);
    +	_thread = new std::thread(run, this);
    +	_thread->detach();
    +}
    +
    +void Repository::stop() {
    +	if (!_running)
    +		return;
    +	_running = false;
    +	logger_->log_info("%s Repository Monitor Thread Stop", RepositoryTypeStr[_type]);
    +}
    +
    +void Repository::run(Repository *repo) {
    +	// threshold for purge
    +	uint64_t purgeThreshold = repo->_maxPartitionBytes * 3 / 4;
    +	while (repo->_running) {
    +		std::this_thread::sleep_for(
    +				std::chrono::milliseconds(repo->_purgePeriod));
    +		uint64_t curTime = getTimeMillis();
    +		uint64_t size = repo->repoSize();
    +		if (size >= purgeThreshold) {
    +			std::vector<std::string> purgeList;
    +			leveldb::Iterator* it = repo->_db->NewIterator(
    +					leveldb::ReadOptions());
    +			if (repo->_type == PROVENANCE)
    +			{
    +				for (it->SeekToFirst(); it->Valid(); it->Next()) {
    +					ProvenanceEventRecord eventRead;
    +					std::string key = it->key().ToString();
    +					if (eventRead.DeSerialize((uint8_t *) it->value().data(),
    +						(int) it->value().size())) {
    +						if ((curTime - eventRead.getEventTime())
    +							> repo->_maxPartitionMillis)
    +							purgeList.push_back(key);
    +					} else {
    +						repo->logger_->log_debug(
    +							"NiFi %s retrieve event %s fail",
    +							RepositoryTypeStr[repo->_type],
    +							key.c_str());
    +						purgeList.push_back(key);
    +					}
    +				}
    +			}
    +			if (repo->_type == FLOWFILE)
    +			{
    +				for (it->SeekToFirst(); it->Valid(); it->Next()) {
    +					FlowFileEventRecord eventRead;
    +					std::string key = it->key().ToString();
    +					if (eventRead.DeSerialize((uint8_t *) it->value().data(),
    +						(int) it->value().size())) {
    +						if ((curTime - eventRead.getEventTime())
    +							> repo->_maxPartitionMillis)
    +							purgeList.push_back(key);
    +					} else {
    +						repo->logger_->log_debug(
    +							"NiFi %s retrieve event %s fail",
    +							RepositoryTypeStr[repo->_type],
    +							key.c_str());
    +						purgeList.push_back(key);
    +					}
    +				}
    +			}
    +			delete it;
    +			std::vector<std::string>::iterator itPurge;
    +			for (itPurge = purgeList.begin(); itPurge != purgeList.end();
    --- End diff --
    
    will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103957007
  
    --- Diff: libminifi/include/FlowFileRepository.h ---
    @@ -0,0 +1,208 @@
    +/**
    + * @file FlowFileRepository 
    + * Flow file repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __FLOWFILE_REPOSITORY_H__
    +#define __FLOWFILE_REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "Repository.h"
    +
    +class FlowFileRepository;
    +
    +//! FlowFile Event Record
    +class FlowFileEventRecord : protected Serializable
    +{
    +public:
    +	friend class ProcessSession;
    +public:
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance event record
    +	 */
    +	FlowFileEventRecord()
    +	: _entryDate(0), _lineageStartDate(0), _size(0), _offset(0)  
    +	{
    +		_eventTime = getTimeMillis();
    +		logger_ = Logger::getLogger();
    +	}
    +
    +	//! Destructor
    +	virtual ~FlowFileEventRecord() {
    +	}
    +	//! Get Attributes
    +	std::map<std::string, std::string> getAttributes() {
    +		return _attributes;
    +	}
    +	//! Get Size
    +	uint64_t getFileSize() {
    +		return _size;
    +	}
    +	// ! Get Offset
    +	uint64_t getFileOffset() {
    +		return _offset;
    +	}
    +	// ! Get Entry Date
    +	uint64_t getFlowFileEntryDate() {
    +		return _entryDate;
    +	}
    +	// ! Get Lineage Start Date
    +	uint64_t getlineageStartDate() {
    +		return _lineageStartDate;
    +	}
    +	// ! Get Event Time
    +	uint64_t getEventTime() {
    +		return _eventTime;
    +	}
    +	//! Get FlowFileUuid
    +	std::string getFlowFileUuid()
    +	{
    +		return _uuid;
    +	}
    +	//! Get ConnectionUuid
    +	std::string getConnectionUuid()
    +	{
    +		return _uuidConnection;
    +	}
    +	//! Get content full path
    +	std::string getContentFullPath()
    +	{
    +		return _contentFullPath;
    +	}
    +	//! Get LineageIdentifiers
    +	std::set<std::string> getLineageIdentifiers()
    +	{
    +		return _lineageIdentifiers;
    +	}
    +	//! fromFlowFile
    +	void fromFlowFile(FlowFileRecord *flow, std::string uuidConnection)
    --- End diff --
    
    Please use smart pointers. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104318245
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    +{
    +public:
    +	enum RepositoryType {
    +		//! Provenance Repo Type
    +		PROVENANCE,
    +		//! FlowFile Repo Type
    +		FLOWFILE,
    +		MAX_REPO_TYPE
    +	};
    +	static const char *RepositoryTypeStr[MAX_REPO_TYPE];
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	Repository(RepositoryType type, std::string directory, 
    +		int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
    +		_type = type;
    +		_directory = directory;
    +		_maxPartitionMillis = maxPartitionMillis;
    +		_maxPartitionBytes = maxPartitionBytes;
    +		_purgePeriod = purgePeriod;
    +		logger_ = Logger::getLogger();
    +		configure_ = Configure::getConfigure();
    +		_db = NULL;
    +		_thread = NULL;
    +		_running = false;
    +		_repoFull = false;
    +		_enable = true;
    +	}
    +
    +	//! Destructor
    +	virtual ~Repository() {
    +		stop();
    +		if (this->_thread)
    +			delete this->_thread;
    +		destroy();
    +	}
    +
    +	//! initialize
    +	virtual bool initialize()
    +	{
    +		std::string value;
    +
    +		if (_type == PROVENANCE)
    +		{
    +			if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		if (_type == FLOWFILE)
    +		{
    +			if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		return true;
    +	}
    +	//! Put
    +	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
    +	{
    +		if (!_enable)
    +			return false;
    +			
    +		// persistent to the DB
    +		leveldb::Slice value((const char *) buf, bufLen);
    +		leveldb::Status status;
    +		status = _db->Put(leveldb::WriteOptions(), key, value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Delete
    +	virtual bool Delete(std::string key)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Delete(leveldb::WriteOptions(), key);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Get
    +	virtual bool Get(std::string key, std::string &value)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Get(leveldb::ReadOptions(), key, &value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! destroy
    +	void destroy()
    +	{
    +		if (_db)
    --- End diff --
    
    we only call destroy when we delete the repo, will change to a private function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103955690
  
    --- Diff: libminifi/include/Connection.h ---
    @@ -84,6 +84,10 @@ class Connection
     		else
     			return false;
     	}
    +	//! Get UUID Str
    +	std::string getUUIDStr() {
    --- End diff --
    
    We've (informally agreed ) with the google's code style per the E-mails on the mailing list. I'll do my best to give some pointers here. should be
    std::string getUUIDStr() const 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103956332
  
    --- Diff: libminifi/include/FlowFileRepository.h ---
    @@ -0,0 +1,208 @@
    +/**
    + * @file FlowFileRepository 
    + * Flow file repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __FLOWFILE_REPOSITORY_H__
    +#define __FLOWFILE_REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "Repository.h"
    +
    +class FlowFileRepository;
    +
    +//! FlowFile Event Record
    +class FlowFileEventRecord : protected Serializable
    +{
    +public:
    +	friend class ProcessSession;
    +public:
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance event record
    +	 */
    +	FlowFileEventRecord()
    +	: _entryDate(0), _lineageStartDate(0), _size(0), _offset(0)  
    +	{
    +		_eventTime = getTimeMillis();
    +		logger_ = Logger::getLogger();
    +	}
    +
    +	//! Destructor
    +	virtual ~FlowFileEventRecord() {
    +	}
    +	//! Get Attributes
    +	std::map<std::string, std::string> getAttributes() {
    --- End diff --
    
    will these be modified, why not const? Same with those below. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104300837
  
    --- Diff: libminifi/include/Connection.h ---
    @@ -180,7 +184,8 @@ class Connection
     	std::atomic<uint64_t> _maxQueueDataSize;
     	//! Flow File Expiration Duration in= MilliSeconds
     	std::atomic<uint64_t> _expiredDuration;
    -
    +	//! UUID string
    +	std::string _uuidStr;
    --- End diff --
    
    all the other private members in the Connection.h are like _XXX based. Just want to keep it consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103962251
  
    --- Diff: libminifi/include/FlowController.h ---
    @@ -197,6 +202,8 @@ class FlowController {
     	std::atomic<bool> _initialized;
     	//! Provenance Repo
     	ProvenanceRepository *_provenanceRepo;
    +	//! FlowFile Repo
    +	FlowFileRepository *_flowfileRepo;
    --- End diff --
    
    Why does this need to be a pointer? Why not use unique/shared pointer depending on the need? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103957815
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    --- End diff --
    
    Repository should be general. Extract level DB into a data stream or specific to an extending class. Can even templatize this class to get rid of FlowRepository.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103960094
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    +{
    +public:
    +	enum RepositoryType {
    +		//! Provenance Repo Type
    +		PROVENANCE,
    +		//! FlowFile Repo Type
    +		FLOWFILE,
    +		MAX_REPO_TYPE
    +	};
    +	static const char *RepositoryTypeStr[MAX_REPO_TYPE];
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	Repository(RepositoryType type, std::string directory, 
    +		int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
    +		_type = type;
    +		_directory = directory;
    +		_maxPartitionMillis = maxPartitionMillis;
    +		_maxPartitionBytes = maxPartitionBytes;
    +		_purgePeriod = purgePeriod;
    +		logger_ = Logger::getLogger();
    +		configure_ = Configure::getConfigure();
    +		_db = NULL;
    +		_thread = NULL;
    +		_running = false;
    +		_repoFull = false;
    +		_enable = true;
    +	}
    +
    +	//! Destructor
    +	virtual ~Repository() {
    +		stop();
    +		if (this->_thread)
    +			delete this->_thread;
    +		destroy();
    +	}
    +
    +	//! initialize
    +	virtual bool initialize()
    +	{
    +		std::string value;
    +
    +		if (_type == PROVENANCE)
    +		{
    +			if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		if (_type == FLOWFILE)
    +		{
    +			if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		return true;
    +	}
    +	//! Put
    +	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
    +	{
    +		if (!_enable)
    +			return false;
    +			
    +		// persistent to the DB
    +		leveldb::Slice value((const char *) buf, bufLen);
    +		leveldb::Status status;
    +		status = _db->Put(leveldb::WriteOptions(), key, value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Delete
    +	virtual bool Delete(std::string key)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Delete(leveldb::WriteOptions(), key);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Get
    +	virtual bool Get(std::string key, std::string &value)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Get(leveldb::ReadOptions(), key, &value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! destroy
    +	void destroy()
    +	{
    +		if (_db)
    +		{
    +			delete _db;
    +			_db = NULL;
    +		}
    +	}
    +	//! Run function for the thread
    +	static void run(Repository *repo);
    +	//! Start the repository monitor thread
    +	virtual void start();
    +	//! Stop the repository monitor thread
    +	virtual void stop();
    +	//! whether the repo is full
    +	virtual bool isFull()
    +	{
    +		return _repoFull;
    +	}
    +	//! whether the repo is enable 
    +	virtual bool isEnable()
    +	{
    +		return _enable;
    +	}
    +
    +protected:
    +	//! Repo Type
    +	RepositoryType _type;
    +	//! Mutex for protection
    +	std::mutex _mtx;
    +	//! repository directory
    +	std::string _directory;
    +	//! Logger
    +	std::shared_ptr<Logger> logger_;
    +	//! Configure
    +	//! max db entry life time
    +	Configure *configure_;
    +	int64_t _maxPartitionMillis;
    +	//! max db size
    +	int64_t _maxPartitionBytes;
    +	//! purge period
    +	uint64_t _purgePeriod;
    +	//! level DB database
    +	leveldb::DB* _db;
    +	//! thread
    +	std::thread *_thread;
    +	//! whether it is running
    --- End diff --
    
    Can you better explain the difference of running and enable? Preferably in comments? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104319954
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    +{
    +public:
    +	enum RepositoryType {
    +		//! Provenance Repo Type
    +		PROVENANCE,
    +		//! FlowFile Repo Type
    +		FLOWFILE,
    +		MAX_REPO_TYPE
    +	};
    +	static const char *RepositoryTypeStr[MAX_REPO_TYPE];
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	Repository(RepositoryType type, std::string directory, 
    +		int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
    +		_type = type;
    +		_directory = directory;
    +		_maxPartitionMillis = maxPartitionMillis;
    +		_maxPartitionBytes = maxPartitionBytes;
    +		_purgePeriod = purgePeriod;
    +		logger_ = Logger::getLogger();
    +		configure_ = Configure::getConfigure();
    +		_db = NULL;
    +		_thread = NULL;
    +		_running = false;
    +		_repoFull = false;
    +		_enable = true;
    +	}
    +
    +	//! Destructor
    +	virtual ~Repository() {
    +		stop();
    +		if (this->_thread)
    +			delete this->_thread;
    +		destroy();
    +	}
    +
    +	//! initialize
    +	virtual bool initialize()
    +	{
    +		std::string value;
    +
    +		if (_type == PROVENANCE)
    +		{
    +			if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		if (_type == FLOWFILE)
    +		{
    +			if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		return true;
    +	}
    +	//! Put
    +	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
    +	{
    +		if (!_enable)
    +			return false;
    +			
    +		// persistent to the DB
    +		leveldb::Slice value((const char *) buf, bufLen);
    +		leveldb::Status status;
    +		status = _db->Put(leveldb::WriteOptions(), key, value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Delete
    +	virtual bool Delete(std::string key)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Delete(leveldb::WriteOptions(), key);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Get
    +	virtual bool Get(std::string key, std::string &value)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Get(leveldb::ReadOptions(), key, &value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! destroy
    +	void destroy()
    +	{
    +		if (_db)
    +		{
    +			delete _db;
    +			_db = NULL;
    +		}
    +	}
    +	//! Run function for the thread
    +	static void run(Repository *repo);
    +	//! Start the repository monitor thread
    +	virtual void start();
    +	//! Stop the repository monitor thread
    +	virtual void stop();
    +	//! whether the repo is full
    +	virtual bool isFull()
    +	{
    +		return _repoFull;
    +	}
    +	//! whether the repo is enable 
    +	virtual bool isEnable()
    +	{
    +		return _enable;
    +	}
    +
    +protected:
    +	//! Repo Type
    +	RepositoryType _type;
    +	//! Mutex for protection
    +	std::mutex _mtx;
    +	//! repository directory
    +	std::string _directory;
    +	//! Logger
    +	std::shared_ptr<Logger> logger_;
    +	//! Configure
    +	//! max db entry life time
    +	Configure *configure_;
    +	int64_t _maxPartitionMillis;
    +	//! max db size
    +	int64_t _maxPartitionBytes;
    +	//! purge period
    +	uint64_t _purgePeriod;
    +	//! level DB database
    +	leveldb::DB* _db;
    +	//! thread
    +	std::thread *_thread;
    +	//! whether it is running
    --- End diff --
    
    Sure.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103955841
  
    --- Diff: libminifi/include/Connection.h ---
    @@ -180,7 +184,8 @@ class Connection
     	std::atomic<uint64_t> _maxQueueDataSize;
     	//! Flow File Expiration Duration in= MilliSeconds
     	std::atomic<uint64_t> _expiredDuration;
    -
    +	//! UUID string
    +	std::string _uuidStr;
    --- End diff --
    
    You can run the linter. I can provide the link if you need it. It should be in the mailing list archive. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104432714
  
    --- Diff: libminifi/include/Connection.h ---
    @@ -180,7 +184,8 @@ class Connection
     	std::atomic<uint64_t> _maxQueueDataSize;
     	//! Flow File Expiration Duration in= MilliSeconds
     	std::atomic<uint64_t> _expiredDuration;
    -
    +	//! UUID string
    +	std::string _uuidStr;
    --- End diff --
    
    We're making a transition to the google code style. I don't think consistency in a class is a concern now since we're in such flux. Doing that will help me avoid having to do new members. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104320252
  
    --- Diff: libminifi/src/Repository.cpp ---
    @@ -0,0 +1,140 @@
    +/**
    + * @file Repository.cpp
    + * Repository implemenatation 
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include <cstdint>
    +#include <vector>
    +#include <arpa/inet.h>
    +#include "io/DataStream.h"
    +#include "io/Serializable.h"
    +#include "Relationship.h"
    +#include "Logger.h"
    +#include "FlowController.h"
    +#include "Repository.h"
    +#include "Provenance.h"
    +#include "FlowFileRepository.h"
    +
    +const char *Repository::RepositoryTypeStr[MAX_REPO_TYPE] = {"Provenace Repository", "FlowFile Repository"};
    +uint64_t Repository::_repoSize[MAX_REPO_TYPE] = {0, 0}; 
    +
    +void Repository::start() {
    +	if (!_enable)
    +		return;
    +	if (this->_purgePeriod <= 0)
    +		return;
    +	if (_running)
    +		return;
    +	_running = true;
    +	logger_->log_info("%s Repository Monitor Thread Start", RepositoryTypeStr[_type]);
    +	_thread = new std::thread(run, this);
    +	_thread->detach();
    +}
    +
    +void Repository::stop() {
    +	if (!_running)
    +		return;
    +	_running = false;
    +	logger_->log_info("%s Repository Monitor Thread Stop", RepositoryTypeStr[_type]);
    +}
    +
    +void Repository::run(Repository *repo) {
    +	// threshold for purge
    --- End diff --
    
    uint64_t purgeThreshold = repo->_maxPartitionBytes * 3 / 4;
    
    # Provenance Repository #
    nifi.provenance.repository.directory.default=./provenance_repository
    nifi.provenance.repository.max.storage.time=1 MIN
    nifi.provenance.repository.max.storage.size=1 MB
    So once the repo size exceed 75%, we start to purge the record if the record was stay in the repo exceed the max storage time.
    If repo size is full, we mark the repo full and we will stop to put the record in repo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104320313
  
    --- Diff: libminifi/include/FlowController.h ---
    @@ -197,6 +202,8 @@ class FlowController {
     	std::atomic<bool> _initialized;
     	//! Provenance Repo
     	ProvenanceRepository *_provenanceRepo;
    +	//! FlowFile Repo
    +	FlowFileRepository *_flowfileRepo;
    --- End diff --
    
    keep it consistent with other context in FlowController


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104713386
  
    --- Diff: libminifi/test/unit/ProcessorTests.cpp ---
    @@ -33,10 +35,18 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
     
     	TestController testController;
     
    -	testController.enableDebug();
    +	Configure *config = Configure::getConfigure();
     
    -	ProvenanceTestRepository repo;
    -	TestFlowController controller(repo);
    +	config->set(BaseLogger::nifi_log_appender,"rollingappender");
    --- End diff --
    
    @apiri  @benqiu2016  Can this be removed? either by Bin or Aldrin on the merge. Logs shouldn't be needed for tests. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103958937
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    +{
    +public:
    +	enum RepositoryType {
    +		//! Provenance Repo Type
    +		PROVENANCE,
    +		//! FlowFile Repo Type
    +		FLOWFILE,
    +		MAX_REPO_TYPE
    +	};
    +	static const char *RepositoryTypeStr[MAX_REPO_TYPE];
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	Repository(RepositoryType type, std::string directory, 
    +		int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
    +		_type = type;
    +		_directory = directory;
    +		_maxPartitionMillis = maxPartitionMillis;
    +		_maxPartitionBytes = maxPartitionBytes;
    +		_purgePeriod = purgePeriod;
    +		logger_ = Logger::getLogger();
    +		configure_ = Configure::getConfigure();
    +		_db = NULL;
    +		_thread = NULL;
    +		_running = false;
    +		_repoFull = false;
    +		_enable = true;
    +	}
    +
    +	//! Destructor
    +	virtual ~Repository() {
    +		stop();
    +		if (this->_thread)
    +			delete this->_thread;
    +		destroy();
    +	}
    +
    +	//! initialize
    +	virtual bool initialize()
    +	{
    +		std::string value;
    +
    +		if (_type == PROVENANCE)
    +		{
    +			if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		if (_type == FLOWFILE)
    +		{
    +			if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		return true;
    +	}
    +	//! Put
    +	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
    +	{
    +		if (!_enable)
    +			return false;
    +			
    +		// persistent to the DB
    +		leveldb::Slice value((const char *) buf, bufLen);
    +		leveldb::Status status;
    +		status = _db->Put(leveldb::WriteOptions(), key, value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Delete
    +	virtual bool Delete(std::string key)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Delete(leveldb::WriteOptions(), key);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Get
    +	virtual bool Get(std::string key, std::string &value)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Get(leveldb::ReadOptions(), key, &value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! destroy
    +	void destroy()
    +	{
    +		if (_db)
    --- End diff --
    
    This doesn't appear to be thread safe. other state variables are not changed here. How is this limited with a public function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104432956
  
    --- Diff: libminifi/include/FlowFileRepository.h ---
    @@ -0,0 +1,208 @@
    +/**
    + * @file FlowFileRepository 
    + * Flow file repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __FLOWFILE_REPOSITORY_H__
    +#define __FLOWFILE_REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "Repository.h"
    +
    +class FlowFileRepository;
    +
    +//! FlowFile Event Record
    +class FlowFileEventRecord : protected Serializable
    +{
    +public:
    +	friend class ProcessSession;
    +public:
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance event record
    +	 */
    +	FlowFileEventRecord()
    +	: _entryDate(0), _lineageStartDate(0), _size(0), _offset(0)  
    +	{
    +		_eventTime = getTimeMillis();
    +		logger_ = Logger::getLogger();
    +	}
    +
    +	//! Destructor
    +	virtual ~FlowFileEventRecord() {
    +	}
    +	//! Get Attributes
    +	std::map<std::string, std::string> getAttributes() {
    --- End diff --
    
    what do you mean somehow? can you explain a path? Why would you be worrying about an event record being deleted? If that is the case we have bigger problems. perhaps we should discuss this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103956448
  
    --- Diff: libminifi/include/FlowFileRepository.h ---
    @@ -0,0 +1,208 @@
    +/**
    + * @file FlowFileRepository 
    + * Flow file repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __FLOWFILE_REPOSITORY_H__
    +#define __FLOWFILE_REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "Repository.h"
    +
    +class FlowFileRepository;
    +
    +//! FlowFile Event Record
    +class FlowFileEventRecord : protected Serializable
    +{
    +public:
    +	friend class ProcessSession;
    +public:
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance event record
    +	 */
    +	FlowFileEventRecord()
    +	: _entryDate(0), _lineageStartDate(0), _size(0), _offset(0)  
    +	{
    +		_eventTime = getTimeMillis();
    +		logger_ = Logger::getLogger();
    +	}
    +
    +	//! Destructor
    +	virtual ~FlowFileEventRecord() {
    +	}
    +	//! Get Attributes
    +	std::map<std::string, std::string> getAttributes() {
    --- End diff --
    
    Sorry why not return by reference and be const. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104318122
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    +{
    +public:
    +	enum RepositoryType {
    +		//! Provenance Repo Type
    +		PROVENANCE,
    +		//! FlowFile Repo Type
    +		FLOWFILE,
    +		MAX_REPO_TYPE
    +	};
    +	static const char *RepositoryTypeStr[MAX_REPO_TYPE];
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	Repository(RepositoryType type, std::string directory, 
    +		int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
    +		_type = type;
    +		_directory = directory;
    +		_maxPartitionMillis = maxPartitionMillis;
    +		_maxPartitionBytes = maxPartitionBytes;
    +		_purgePeriod = purgePeriod;
    +		logger_ = Logger::getLogger();
    +		configure_ = Configure::getConfigure();
    +		_db = NULL;
    +		_thread = NULL;
    +		_running = false;
    +		_repoFull = false;
    +		_enable = true;
    +	}
    +
    +	//! Destructor
    +	virtual ~Repository() {
    +		stop();
    +		if (this->_thread)
    +			delete this->_thread;
    +		destroy();
    +	}
    +
    +	//! initialize
    +	virtual bool initialize()
    +	{
    +		std::string value;
    +
    +		if (_type == PROVENANCE)
    +		{
    +			if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		if (_type == FLOWFILE)
    +		{
    +			if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		return true;
    +	}
    +	//! Put
    +	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
    +	{
    +		if (!_enable)
    --- End diff --
    
    enable is set while the repo is initialized from minifi property, so we are safe here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104301877
  
    --- Diff: libminifi/include/FlowFileRepository.h ---
    @@ -0,0 +1,208 @@
    +/**
    + * @file FlowFileRepository 
    + * Flow file repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __FLOWFILE_REPOSITORY_H__
    +#define __FLOWFILE_REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "Repository.h"
    +
    +class FlowFileRepository;
    +
    +//! FlowFile Event Record
    +class FlowFileEventRecord : protected Serializable
    +{
    +public:
    +	friend class ProcessSession;
    --- End diff --
    
    will remove that. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104301034
  
    --- Diff: libminifi/include/FlowFileRecord.h ---
    @@ -202,6 +216,8 @@ class FlowFileRecord
     	std::string _uuidStr;
     	//! UUID string for all parents
     	std::set<std::string> _lineageIdentifiers;
    +	//! whether it is stored to DB
    +	bool _isStoredToRepo;
    --- End diff --
    
    Same comment as above, keep it consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103960055
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    +{
    +public:
    +	enum RepositoryType {
    +		//! Provenance Repo Type
    +		PROVENANCE,
    +		//! FlowFile Repo Type
    +		FLOWFILE,
    +		MAX_REPO_TYPE
    +	};
    +	static const char *RepositoryTypeStr[MAX_REPO_TYPE];
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	Repository(RepositoryType type, std::string directory, 
    +		int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
    +		_type = type;
    +		_directory = directory;
    +		_maxPartitionMillis = maxPartitionMillis;
    +		_maxPartitionBytes = maxPartitionBytes;
    +		_purgePeriod = purgePeriod;
    +		logger_ = Logger::getLogger();
    +		configure_ = Configure::getConfigure();
    +		_db = NULL;
    +		_thread = NULL;
    +		_running = false;
    +		_repoFull = false;
    +		_enable = true;
    +	}
    +
    +	//! Destructor
    +	virtual ~Repository() {
    +		stop();
    +		if (this->_thread)
    +			delete this->_thread;
    +		destroy();
    +	}
    +
    +	//! initialize
    +	virtual bool initialize()
    +	{
    +		std::string value;
    +
    +		if (_type == PROVENANCE)
    +		{
    +			if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		if (_type == FLOWFILE)
    +		{
    +			if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		return true;
    +	}
    +	//! Put
    +	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
    +	{
    +		if (!_enable)
    +			return false;
    +			
    +		// persistent to the DB
    +		leveldb::Slice value((const char *) buf, bufLen);
    +		leveldb::Status status;
    +		status = _db->Put(leveldb::WriteOptions(), key, value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Delete
    +	virtual bool Delete(std::string key)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Delete(leveldb::WriteOptions(), key);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Get
    +	virtual bool Get(std::string key, std::string &value)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Get(leveldb::ReadOptions(), key, &value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! destroy
    +	void destroy()
    +	{
    +		if (_db)
    +		{
    +			delete _db;
    +			_db = NULL;
    +		}
    +	}
    +	//! Run function for the thread
    +	static void run(Repository *repo);
    +	//! Start the repository monitor thread
    +	virtual void start();
    +	//! Stop the repository monitor thread
    +	virtual void stop();
    +	//! whether the repo is full
    +	virtual bool isFull()
    +	{
    +		return _repoFull;
    +	}
    +	//! whether the repo is enable 
    +	virtual bool isEnable()
    +	{
    +		return _enable;
    +	}
    +
    +protected:
    +	//! Repo Type
    +	RepositoryType _type;
    +	//! Mutex for protection
    +	std::mutex _mtx;
    +	//! repository directory
    +	std::string _directory;
    +	//! Logger
    +	std::shared_ptr<Logger> logger_;
    +	//! Configure
    +	//! max db entry life time
    +	Configure *configure_;
    +	int64_t _maxPartitionMillis;
    +	//! max db size
    +	int64_t _maxPartitionBytes;
    +	//! purge period
    +	uint64_t _purgePeriod;
    +	//! level DB database
    +	leveldb::DB* _db;
    +	//! thread
    +	std::thread *_thread;
    --- End diff --
    
    Why does this need to be a pointer? The move operator allows you to do thread_ = std::thread{ func, var }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104300897
  
    --- Diff: libminifi/include/FlowFileRecord.h ---
    @@ -108,6 +109,10 @@ class FlowFileRecord
     	 * Create a new flow record
     	 */
     	FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim = NULL);
    --- End diff --
    
    will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi-minifi-cpp/pull/62


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103956734
  
    --- Diff: libminifi/include/FlowFileRecord.h ---
    @@ -108,6 +109,10 @@ class FlowFileRecord
     	 * Create a new flow record
     	 */
     	FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim = NULL);
    +	/*!
    +	 * Create a new flow record from repo flow event
    +	 */
    +	FlowFileRecord(FlowFileEventRecord *event);
    --- End diff --
    
    why is this not explicit? Please use smart pointers. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103961630
  
    --- Diff: libminifi/src/Repository.cpp ---
    @@ -0,0 +1,140 @@
    +/**
    + * @file Repository.cpp
    + * Repository implemenatation 
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include <cstdint>
    +#include <vector>
    +#include <arpa/inet.h>
    +#include "io/DataStream.h"
    +#include "io/Serializable.h"
    +#include "Relationship.h"
    +#include "Logger.h"
    +#include "FlowController.h"
    +#include "Repository.h"
    +#include "Provenance.h"
    +#include "FlowFileRepository.h"
    +
    +const char *Repository::RepositoryTypeStr[MAX_REPO_TYPE] = {"Provenace Repository", "FlowFile Repository"};
    +uint64_t Repository::_repoSize[MAX_REPO_TYPE] = {0, 0}; 
    +
    +void Repository::start() {
    +	if (!_enable)
    +		return;
    +	if (this->_purgePeriod <= 0)
    +		return;
    +	if (_running)
    +		return;
    +	_running = true;
    +	logger_->log_info("%s Repository Monitor Thread Start", RepositoryTypeStr[_type]);
    +	_thread = new std::thread(run, this);
    +	_thread->detach();
    +}
    +
    +void Repository::stop() {
    +	if (!_running)
    +		return;
    +	_running = false;
    +	logger_->log_info("%s Repository Monitor Thread Stop", RepositoryTypeStr[_type]);
    +}
    +
    +void Repository::run(Repository *repo) {
    +	// threshold for purge
    +	uint64_t purgeThreshold = repo->_maxPartitionBytes * 3 / 4;
    +	while (repo->_running) {
    +		std::this_thread::sleep_for(
    +				std::chrono::milliseconds(repo->_purgePeriod));
    +		uint64_t curTime = getTimeMillis();
    +		uint64_t size = repo->repoSize();
    +		if (size >= purgeThreshold) {
    +			std::vector<std::string> purgeList;
    +			leveldb::Iterator* it = repo->_db->NewIterator(
    +					leveldb::ReadOptions());
    +			if (repo->_type == PROVENANCE)
    +			{
    +				for (it->SeekToFirst(); it->Valid(); it->Next()) {
    +					ProvenanceEventRecord eventRead;
    +					std::string key = it->key().ToString();
    +					if (eventRead.DeSerialize((uint8_t *) it->value().data(),
    +						(int) it->value().size())) {
    +						if ((curTime - eventRead.getEventTime())
    +							> repo->_maxPartitionMillis)
    +							purgeList.push_back(key);
    +					} else {
    +						repo->logger_->log_debug(
    +							"NiFi %s retrieve event %s fail",
    +							RepositoryTypeStr[repo->_type],
    +							key.c_str());
    +						purgeList.push_back(key);
    +					}
    +				}
    +			}
    +			if (repo->_type == FLOWFILE)
    +			{
    +				for (it->SeekToFirst(); it->Valid(); it->Next()) {
    +					FlowFileEventRecord eventRead;
    +					std::string key = it->key().ToString();
    +					if (eventRead.DeSerialize((uint8_t *) it->value().data(),
    +						(int) it->value().size())) {
    +						if ((curTime - eventRead.getEventTime())
    +							> repo->_maxPartitionMillis)
    +							purgeList.push_back(key);
    +					} else {
    +						repo->logger_->log_debug(
    +							"NiFi %s retrieve event %s fail",
    +							RepositoryTypeStr[repo->_type],
    +							key.c_str());
    +						purgeList.push_back(key);
    +					}
    +				}
    +			}
    +			delete it;
    +			std::vector<std::string>::iterator itPurge;
    +			for (itPurge = purgeList.begin(); itPurge != purgeList.end();
    --- End diff --
    
    why not use auto?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104319998
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    +{
    +public:
    +	enum RepositoryType {
    +		//! Provenance Repo Type
    +		PROVENANCE,
    +		//! FlowFile Repo Type
    +		FLOWFILE,
    +		MAX_REPO_TYPE
    +	};
    +	static const char *RepositoryTypeStr[MAX_REPO_TYPE];
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	Repository(RepositoryType type, std::string directory, 
    +		int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
    +		_type = type;
    +		_directory = directory;
    +		_maxPartitionMillis = maxPartitionMillis;
    +		_maxPartitionBytes = maxPartitionBytes;
    +		_purgePeriod = purgePeriod;
    +		logger_ = Logger::getLogger();
    +		configure_ = Configure::getConfigure();
    +		_db = NULL;
    +		_thread = NULL;
    +		_running = false;
    +		_repoFull = false;
    +		_enable = true;
    +	}
    +
    +	//! Destructor
    +	virtual ~Repository() {
    +		stop();
    +		if (this->_thread)
    +			delete this->_thread;
    +		destroy();
    +	}
    +
    +	//! initialize
    +	virtual bool initialize()
    +	{
    +		std::string value;
    +
    +		if (_type == PROVENANCE)
    +		{
    +			if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		if (_type == FLOWFILE)
    +		{
    +			if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		return true;
    +	}
    +	//! Put
    +	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
    +	{
    +		if (!_enable)
    +			return false;
    +			
    +		// persistent to the DB
    +		leveldb::Slice value((const char *) buf, bufLen);
    +		leveldb::Status status;
    +		status = _db->Put(leveldb::WriteOptions(), key, value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Delete
    +	virtual bool Delete(std::string key)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Delete(leveldb::WriteOptions(), key);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Get
    +	virtual bool Get(std::string key, std::string &value)
    +	{
    +		if (!_enable)
    +			return false;
    +		leveldb::Status status;
    +		status = _db->Get(leveldb::ReadOptions(), key, &value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! destroy
    +	void destroy()
    +	{
    +		if (_db)
    +		{
    +			delete _db;
    +			_db = NULL;
    +		}
    +	}
    +	//! Run function for the thread
    +	static void run(Repository *repo);
    +	//! Start the repository monitor thread
    +	virtual void start();
    +	//! Stop the repository monitor thread
    +	virtual void stop();
    +	//! whether the repo is full
    +	virtual bool isFull()
    +	{
    +		return _repoFull;
    +	}
    +	//! whether the repo is enable 
    +	virtual bool isEnable()
    +	{
    +		return _enable;
    +	}
    +
    +protected:
    +	//! Repo Type
    +	RepositoryType _type;
    +	//! Mutex for protection
    +	std::mutex _mtx;
    +	//! repository directory
    +	std::string _directory;
    +	//! Logger
    +	std::shared_ptr<Logger> logger_;
    +	//! Configure
    +	//! max db entry life time
    +	Configure *configure_;
    +	int64_t _maxPartitionMillis;
    +	//! max db size
    +	int64_t _maxPartitionBytes;
    +	//! purge period
    +	uint64_t _purgePeriod;
    +	//! level DB database
    +	leveldb::DB* _db;
    +	//! thread
    +	std::thread *_thread;
    +	//! whether it is running
    +	bool _running;
    +	//! whether it is enable 
    +	bool _enable;
    +	//! whether stop accepting provenace event
    +	std::atomic<bool> _repoFull;
    +	//! repoSize
    +	uint64_t repoSize();
    +	//! size of the directory
    +	static uint64_t _repoSize[MAX_REPO_TYPE];
    --- End diff --
    
    we only supported two repo as NiFI


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104318102
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    --- End diff --
    
    some flow repository behave is different from provenance repository. so we need to have that extend repository. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp issue #62: MINIFI-231: Add Flow Persistent, Using id instead...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the issue:

    https://github.com/apache/nifi-minifi-cpp/pull/62
  
    reviewing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104318011
  
    --- Diff: libminifi/include/FlowFileRepository.h ---
    @@ -0,0 +1,208 @@
    +/**
    + * @file FlowFileRepository 
    + * Flow file repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __FLOWFILE_REPOSITORY_H__
    +#define __FLOWFILE_REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "Repository.h"
    +
    +class FlowFileRepository;
    +
    +//! FlowFile Event Record
    +class FlowFileEventRecord : protected Serializable
    +{
    +public:
    +	friend class ProcessSession;
    +public:
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance event record
    +	 */
    +	FlowFileEventRecord()
    +	: _entryDate(0), _lineageStartDate(0), _size(0), _offset(0)  
    +	{
    +		_eventTime = getTimeMillis();
    +		logger_ = Logger::getLogger();
    +	}
    +
    +	//! Destructor
    +	virtual ~FlowFileEventRecord() {
    +	}
    +	//! Get Attributes
    +	std::map<std::string, std::string> getAttributes() {
    +		return _attributes;
    +	}
    +	//! Get Size
    +	uint64_t getFileSize() {
    +		return _size;
    +	}
    +	// ! Get Offset
    +	uint64_t getFileOffset() {
    +		return _offset;
    +	}
    +	// ! Get Entry Date
    +	uint64_t getFlowFileEntryDate() {
    +		return _entryDate;
    +	}
    +	// ! Get Lineage Start Date
    +	uint64_t getlineageStartDate() {
    +		return _lineageStartDate;
    +	}
    +	// ! Get Event Time
    +	uint64_t getEventTime() {
    +		return _eventTime;
    +	}
    +	//! Get FlowFileUuid
    +	std::string getFlowFileUuid()
    +	{
    +		return _uuid;
    +	}
    +	//! Get ConnectionUuid
    +	std::string getConnectionUuid()
    +	{
    +		return _uuidConnection;
    +	}
    +	//! Get content full path
    +	std::string getContentFullPath()
    +	{
    +		return _contentFullPath;
    +	}
    +	//! Get LineageIdentifiers
    +	std::set<std::string> getLineageIdentifiers()
    +	{
    +		return _lineageIdentifiers;
    +	}
    +	//! fromFlowFile
    +	void fromFlowFile(FlowFileRecord *flow, std::string uuidConnection)
    +	{
    +		_entryDate = flow->getEntryDate();
    +		_lineageStartDate = flow->getlineageStartDate();
    +		_lineageIdentifiers = flow->getlineageIdentifiers();
    +		_uuid = flow->getUUIDStr();
    +		_attributes = flow->getAttributes();
    +		_size = flow->getSize();
    +		_offset = flow->getOffset();
    +		_uuidConnection = uuidConnection;
    +		if (flow->getResourceClaim())
    +		{
    +			_contentFullPath = flow->getResourceClaim()->getContentFullPath();
    +		}
    +	}
    +	//! Serialize and Persistent to the repository
    +	bool Serialize(FlowFileRepository *repo);
    +	//! DeSerialize
    +	bool DeSerialize(const uint8_t *buffer, const int bufferSize);
    +	//! DeSerialize
    +	bool DeSerialize(DataStream &stream)
    +	{
    +		return DeSerialize(stream.getBuffer(),stream.getSize());
    +	}
    +	//! DeSerialize
    +	bool DeSerialize(FlowFileRepository *repo, std::string key);
    +
    +protected:
    +
    +	//! Date at which the event was created
    --- End diff --
    
    to keep consistent with current code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104318037
  
    --- Diff: libminifi/include/FlowFileRepository.h ---
    @@ -0,0 +1,208 @@
    +/**
    + * @file FlowFileRepository 
    + * Flow file repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __FLOWFILE_REPOSITORY_H__
    +#define __FLOWFILE_REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "Repository.h"
    +
    +class FlowFileRepository;
    +
    +//! FlowFile Event Record
    +class FlowFileEventRecord : protected Serializable
    +{
    +public:
    +	friend class ProcessSession;
    +public:
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance event record
    +	 */
    +	FlowFileEventRecord()
    +	: _entryDate(0), _lineageStartDate(0), _size(0), _offset(0)  
    +	{
    +		_eventTime = getTimeMillis();
    +		logger_ = Logger::getLogger();
    +	}
    +
    +	//! Destructor
    +	virtual ~FlowFileEventRecord() {
    +	}
    +	//! Get Attributes
    +	std::map<std::string, std::string> getAttributes() {
    +		return _attributes;
    +	}
    +	//! Get Size
    +	uint64_t getFileSize() {
    +		return _size;
    +	}
    +	// ! Get Offset
    +	uint64_t getFileOffset() {
    +		return _offset;
    +	}
    +	// ! Get Entry Date
    +	uint64_t getFlowFileEntryDate() {
    +		return _entryDate;
    +	}
    +	// ! Get Lineage Start Date
    +	uint64_t getlineageStartDate() {
    +		return _lineageStartDate;
    +	}
    +	// ! Get Event Time
    +	uint64_t getEventTime() {
    +		return _eventTime;
    +	}
    +	//! Get FlowFileUuid
    +	std::string getFlowFileUuid()
    +	{
    +		return _uuid;
    +	}
    +	//! Get ConnectionUuid
    +	std::string getConnectionUuid()
    +	{
    +		return _uuidConnection;
    +	}
    +	//! Get content full path
    +	std::string getContentFullPath()
    +	{
    +		return _contentFullPath;
    +	}
    +	//! Get LineageIdentifiers
    +	std::set<std::string> getLineageIdentifiers()
    +	{
    +		return _lineageIdentifiers;
    +	}
    +	//! fromFlowFile
    +	void fromFlowFile(FlowFileRecord *flow, std::string uuidConnection)
    +	{
    +		_entryDate = flow->getEntryDate();
    +		_lineageStartDate = flow->getlineageStartDate();
    +		_lineageIdentifiers = flow->getlineageIdentifiers();
    +		_uuid = flow->getUUIDStr();
    +		_attributes = flow->getAttributes();
    +		_size = flow->getSize();
    +		_offset = flow->getOffset();
    +		_uuidConnection = uuidConnection;
    +		if (flow->getResourceClaim())
    +		{
    +			_contentFullPath = flow->getResourceClaim()->getContentFullPath();
    +		}
    +	}
    +	//! Serialize and Persistent to the repository
    +	bool Serialize(FlowFileRepository *repo);
    +	//! DeSerialize
    +	bool DeSerialize(const uint8_t *buffer, const int bufferSize);
    +	//! DeSerialize
    +	bool DeSerialize(DataStream &stream)
    +	{
    +		return DeSerialize(stream.getBuffer(),stream.getSize());
    +	}
    +	//! DeSerialize
    +	bool DeSerialize(FlowFileRepository *repo, std::string key);
    +
    +protected:
    +
    +	//! Date at which the event was created
    +	uint64_t _eventTime;
    +	//! Date at which the flow file entered the flow
    +	uint64_t _entryDate;
    +	//! Date at which the origin of this flow file entered the flow
    +	uint64_t _lineageStartDate;
    +	//! Size in bytes of the data corresponding to this flow file
    +	uint64_t _size;
    +	//! flow uuid
    +	std::string _uuid;
    +	//! connection uuid
    +	std::string _uuidConnection;
    +	//! Offset to the content
    +	uint64_t _offset;
    +	//! Full path to the content
    +	std::string _contentFullPath;
    +	//! Attributes key/values pairs for the flow record
    +	std::map<std::string, std::string> _attributes;
    +	//! UUID string for all parents
    +	std::set<std::string> _lineageIdentifiers;
    +
    +private:
    +
    +	//! Logger
    +	std::shared_ptr<Logger> logger_;
    +	
    +	// Prevent default copy constructor and assignment operation
    +	// Only support pass by reference or pointer
    +	FlowFileEventRecord(const FlowFileEventRecord &parent);
    +	FlowFileEventRecord &operator=(const FlowFileEventRecord &parent);
    +
    +};
    +
    +#define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository"
    +#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M
    +#define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute
    +#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2500) // 2500 msec
    +
    +//! FlowFile Repository
    +class FlowFileRepository : public Repository
    +{
    +public:
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	FlowFileRepository()
    --- End diff --
    
    It is just a default constructor.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #62: MINIFI-231: Add Flow Persistent, Using id ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103960736
  
    --- Diff: libminifi/src/FlowFileRecord.cpp ---
    @@ -74,6 +76,43 @@ FlowFileRecord::FlowFileRecord(std::map<std::string, std::string> attributes, Re
     	logger_ = Logger::getLogger();
     }
     
    +FlowFileRecord::FlowFileRecord(FlowFileEventRecord *event)
    +: _size(0),
    +  _id(_localFlowSeqNumber.load()),
    +  _offset(0),
    +  _penaltyExpirationMs(0),
    +  _claim(NULL),
    +  _isStoredToRepo(false),
    +  _markedDelete(false),
    +  _connection(NULL),
    +  _orginalConnection(NULL)
    +{
    +	_entryDate = event->getFlowFileEntryDate();
    +	_lineageStartDate = event->getlineageStartDate();
    +	_size = event->getFileSize();
    +	_offset = event->getFileOffset();
    +	_lineageIdentifiers = event->getLineageIdentifiers();
    +	_attributes = event->getAttributes();
    +    _snapshot = false;
    +    _uuidStr = event->getFlowFileUuid();
    +    uuid_parse(_uuidStr.c_str(), _uuid);
    +
    +    if (_size > 0)
    +    {
    +    	_claim = new ResourceClaim();
    --- End diff --
    
    Why does claim_ need to be a pointer? You should use a unique_ptr if that is required. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---