You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/01/13 21:24:03 UTC

nifi-minifi-cpp git commit: MINIFI-181 Created initial implementation of PutFile

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 6b317fb4f -> c45f05e51


MINIFI-181 Created initial implementation of PutFile

This closes #39.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/c45f05e5
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/c45f05e5
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/c45f05e5

Branch: refs/heads/master
Commit: c45f05e51e78a5dfe9c69ce231ec579e7736ed2d
Parents: 6b317fb
Author: Andrew I. Christianson <an...@nextcentury.com>
Authored: Thu Jan 5 19:15:54 2017 +0000
Committer: Aldrin Piri <al...@apache.org>
Committed: Fri Jan 13 16:22:49 2017 -0500

----------------------------------------------------------------------
 libminifi/include/FlowController.h |   1 +
 libminifi/include/PutFile.h        |  88 ++++++++++++++
 libminifi/src/FlowController.cpp   |   4 +
 libminifi/src/PutFile.cpp          | 200 ++++++++++++++++++++++++++++++++
 4 files changed, 293 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c45f05e5/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 49629a2..ee8bb4f 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -49,6 +49,7 @@
 #include "RemoteProcessorGroupPort.h"
 #include "Provenance.h"
 #include "GetFile.h"
+#include "PutFile.h"
 #include "TailFile.h"
 #include "ListenSyslog.h"
 #include "ExecuteProcess.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c45f05e5/libminifi/include/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/PutFile.h b/libminifi/include/PutFile.h
new file mode 100644
index 0000000..9f1375d
--- /dev/null
+++ b/libminifi/include/PutFile.h
@@ -0,0 +1,88 @@
+/**
+ * @file PutFile.h
+ * PutFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUT_FILE_H__
+#define __PUT_FILE_H__
+
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! PutFile Class
+class PutFile : public Processor
+{
+public:
+
+	static const std::string CONFLICT_RESOLUTION_STRATEGY_REPLACE;
+	static const std::string CONFLICT_RESOLUTION_STRATEGY_IGNORE;
+	static const std::string CONFLICT_RESOLUTION_STRATEGY_FAIL;
+
+	//! Constructor
+	/*!
+	 * Create a new processor
+	 */
+	PutFile(std::string name, uuid_t uuid = NULL)
+	: Processor(name, uuid)
+	{
+		_logger = Logger::getLogger();
+	}
+	//! Destructor
+	virtual ~PutFile()
+	{
+	}
+	//! Processor Name
+	static const std::string ProcessorName;
+	//! Supported Properties
+	static Property Directory;
+	static Property ConflictResolution;
+	//! Supported Relationships
+	static Relationship Success;
+	static Relationship Failure;
+
+	//! OnTrigger method, implemented by NiFi PutFile
+	virtual void onTrigger(ProcessContext *context, ProcessSession *session);
+	//! Initialize, over write by NiFi PutFile
+	virtual void initialize(void);
+
+	class ReadCallback : public InputStreamCallback
+	{
+	public:
+		ReadCallback(const std::string &tmpFile, const std::string &destFile);
+		~ReadCallback();
+		virtual void process(std::ifstream *stream);
+		bool commit();
+
+	private:
+		Logger *_logger;
+		std::ofstream _tmpFileOs;
+		bool _writeSucceeded = false;
+		std::string _tmpFile;
+		std::string _destFile;
+	};
+
+protected:
+
+private:
+	//! Logger
+	Logger *_logger;
+
+	bool putFile(ProcessSession *session, FlowFileRecord *flowFile, const std::string &tmpFile, const std::string &destFile);
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c45f05e5/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 455788c..caaa8ea 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -183,6 +183,10 @@ Processor *FlowController::createProcessor(std::string name, uuid_t uuid)
 	{
         processor = new GetFile(name, uuid);
 	}
+	else if (name == PutFile::ProcessorName)
+	{
+        processor = new PutFile(name, uuid);
+	}
 	else if (name == TailFile::ProcessorName)
 	{
         processor = new TailFile(name, uuid);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c45f05e5/libminifi/src/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/PutFile.cpp b/libminifi/src/PutFile.cpp
new file mode 100644
index 0000000..3f209ce
--- /dev/null
+++ b/libminifi/src/PutFile.cpp
@@ -0,0 +1,200 @@
+/**
+ * @file PutFile.cpp
+ * PutFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <fstream>
+#include <uuid/uuid.h>
+
+#include "TimeUtil.h"
+#include "PutFile.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE("replace");
+const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_IGNORE("ignore");
+const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL("fail");
+
+const std::string PutFile::ProcessorName("PutFile");
+
+Property PutFile::Directory("Output Directory", "The output directory to which to put files", ".");
+Property PutFile::ConflictResolution("Conflict Resolution Strategy", "Indicates what should happen when a file with the same name already exists in the output directory", CONFLICT_RESOLUTION_STRATEGY_FAIL);
+
+Relationship PutFile::Success("success", "All files are routed to success");
+Relationship PutFile::Failure("failure", "Failed files (conflict, write failure, etc.) are transferred to failure");
+
+void PutFile::initialize()
+{
+	//! Set the supported properties
+	std::set<Property> properties;
+	properties.insert(Directory);
+	properties.insert(ConflictResolution);
+	setSupportedProperties(properties);
+	//! Set the supported relationships
+	std::set<Relationship> relationships;
+	relationships.insert(Success);
+	relationships.insert(Failure);
+	setSupportedRelationships(relationships);
+}
+
+void PutFile::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+	std::string directory;
+
+	if (!context->getProperty(Directory.getName(), directory))
+	{
+		_logger->log_error("Directory attribute is missing or invalid");
+		return;
+	}
+
+	std::string conflictResolution;
+
+	if (!context->getProperty(ConflictResolution.getName(), conflictResolution))
+	{
+		_logger->log_error("Conflict Resolution Strategy attribute is missing or invalid");
+		return;
+	}
+
+	FlowFileRecord *flowFile = session->get();
+
+	// Do nothing if there are no incoming files
+	if (!flowFile)
+	{
+		return;
+	}
+
+	std::string filename;
+	flowFile->getAttribute(FILENAME, filename);
+
+	// Generate a safe (universally-unique) temporary filename on the same partition 
+	char tmpFileUuidStr[37];
+	uuid_t tmpFileUuid;
+	uuid_generate(tmpFileUuid);
+	uuid_unparse(tmpFileUuid, tmpFileUuidStr);
+	std::stringstream tmpFileSs;
+	tmpFileSs << directory << "/." << filename << "." << tmpFileUuidStr;
+	std::string tmpFile = tmpFileSs.str();
+	_logger->log_info("PutFile using temporary file %s", tmpFile.c_str());
+
+	// Determine dest full file paths
+	std::stringstream destFileSs;
+	destFileSs << directory << "/" << filename;
+	std::string destFile = destFileSs.str();
+
+	_logger->log_info("PutFile writing file %s into directory %s", filename.c_str(), directory.c_str());
+
+	// If file exists, apply conflict resolution strategy
+	struct stat statResult;
+
+	if (stat(destFile.c_str(), &statResult) == 0)
+	{
+		_logger->log_info("Destination file %s exists; applying Conflict Resolution Strategy: %s", destFile.c_str(), conflictResolution.c_str());
+
+		if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_REPLACE)
+		{
+			putFile(session, flowFile, tmpFile, destFile);
+		}
+		else if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_IGNORE)
+		{
+			session->transfer(flowFile, Success);
+		}
+		else
+		{
+			session->transfer(flowFile, Failure);
+		}
+	}
+	else
+	{
+		putFile(session, flowFile, tmpFile, destFile);
+	}
+}
+
+bool PutFile::putFile(ProcessSession *session, FlowFileRecord *flowFile, const std::string &tmpFile, const std::string &destFile)
+{
+
+	ReadCallback cb(tmpFile, destFile);
+    	session->read(flowFile, &cb);
+
+	if (cb.commit())
+	{
+		session->transfer(flowFile, Success);
+	}
+	else
+	{
+		session->transfer(flowFile, Failure);
+	}
+}
+
+PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const std::string &destFile)
+: _tmpFile(tmpFile)
+, _tmpFileOs(tmpFile)
+, _destFile(destFile)
+{
+	_logger = Logger::getLogger();
+}
+
+// Copy the entire file contents to the temporary file
+void PutFile::ReadCallback::process(std::ifstream *stream)
+{
+	// Copy file contents into tmp file
+	_writeSucceeded = false;
+	_tmpFileOs << stream->rdbuf();
+	_writeSucceeded = true;
+}
+
+// Renames tmp file to final destination
+// Returns true if commit succeeded
+bool PutFile::ReadCallback::commit()
+{
+	bool success = false;
+
+	_logger->log_info("PutFile committing put file operation to %s", _destFile.c_str());
+
+	if (_writeSucceeded)
+	{
+		_tmpFileOs.close();
+
+		if (rename(_tmpFile.c_str(), _destFile.c_str()))
+		{
+			_logger->log_info("PutFile commit put file operation to %s failed because rename() call failed", _destFile.c_str());
+		}
+		else
+		{
+			success = true;
+			_logger->log_info("PutFile commit put file operation to %s succeeded", _destFile.c_str());
+		}
+	}
+	else
+	{
+		_logger->log_error("PutFile commit put file operation to %s failed because write failed", _destFile.c_str());
+	}
+
+	return success;
+}
+
+// Clean up resources
+PutFile::ReadCallback::~ReadCallback() {
+	// Close tmp file
+	_tmpFileOs.close();
+
+	// Clean up tmp file, if necessary
+	unlink(_tmpFile.c_str());
+}