You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/01/13 21:24:03 UTC
nifi-minifi-cpp git commit: MINIFI-181 Created initial implementation
of PutFile
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 6b317fb4f -> c45f05e51
MINIFI-181 Created initial implementation of PutFile
This closes #39.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/c45f05e5
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/c45f05e5
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/c45f05e5
Branch: refs/heads/master
Commit: c45f05e51e78a5dfe9c69ce231ec579e7736ed2d
Parents: 6b317fb
Author: Andrew I. Christianson <an...@nextcentury.com>
Authored: Thu Jan 5 19:15:54 2017 +0000
Committer: Aldrin Piri <al...@apache.org>
Committed: Fri Jan 13 16:22:49 2017 -0500
----------------------------------------------------------------------
libminifi/include/FlowController.h | 1 +
libminifi/include/PutFile.h | 88 ++++++++++++++
libminifi/src/FlowController.cpp | 4 +
libminifi/src/PutFile.cpp | 200 ++++++++++++++++++++++++++++++++
4 files changed, 293 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c45f05e5/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 49629a2..ee8bb4f 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -49,6 +49,7 @@
#include "RemoteProcessorGroupPort.h"
#include "Provenance.h"
#include "GetFile.h"
+#include "PutFile.h"
#include "TailFile.h"
#include "ListenSyslog.h"
#include "ExecuteProcess.h"
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c45f05e5/libminifi/include/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/PutFile.h b/libminifi/include/PutFile.h
new file mode 100644
index 0000000..9f1375d
--- /dev/null
+++ b/libminifi/include/PutFile.h
@@ -0,0 +1,88 @@
+/**
+ * @file PutFile.h
+ * PutFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUT_FILE_H__
+#define __PUT_FILE_H__
+
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! PutFile Class
+class PutFile : public Processor
+{
+public:
+
+ static const std::string CONFLICT_RESOLUTION_STRATEGY_REPLACE;
+ static const std::string CONFLICT_RESOLUTION_STRATEGY_IGNORE;
+ static const std::string CONFLICT_RESOLUTION_STRATEGY_FAIL;
+
+ //! Constructor
+ /*!
+ * Create a new processor
+ */
+ PutFile(std::string name, uuid_t uuid = NULL)
+ : Processor(name, uuid)
+ {
+ _logger = Logger::getLogger();
+ }
+ //! Destructor
+ virtual ~PutFile()
+ {
+ }
+ //! Processor Name
+ static const std::string ProcessorName;
+ //! Supported Properties
+ static Property Directory;
+ static Property ConflictResolution;
+ //! Supported Relationships
+ static Relationship Success;
+ static Relationship Failure;
+
+ //! OnTrigger method, implemented by NiFi PutFile
+ virtual void onTrigger(ProcessContext *context, ProcessSession *session);
+ //! Initialize, over write by NiFi PutFile
+ virtual void initialize(void);
+
+ class ReadCallback : public InputStreamCallback
+ {
+ public:
+ ReadCallback(const std::string &tmpFile, const std::string &destFile);
+ ~ReadCallback();
+ virtual void process(std::ifstream *stream);
+ bool commit();
+
+ private:
+ Logger *_logger;
+ std::ofstream _tmpFileOs;
+ bool _writeSucceeded = false;
+ std::string _tmpFile;
+ std::string _destFile;
+ };
+
+protected:
+
+private:
+ //! Logger
+ Logger *_logger;
+
+ bool putFile(ProcessSession *session, FlowFileRecord *flowFile, const std::string &tmpFile, const std::string &destFile);
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c45f05e5/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 455788c..caaa8ea 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -183,6 +183,10 @@ Processor *FlowController::createProcessor(std::string name, uuid_t uuid)
{
processor = new GetFile(name, uuid);
}
+ else if (name == PutFile::ProcessorName)
+ {
+ processor = new PutFile(name, uuid);
+ }
else if (name == TailFile::ProcessorName)
{
processor = new TailFile(name, uuid);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c45f05e5/libminifi/src/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/PutFile.cpp b/libminifi/src/PutFile.cpp
new file mode 100644
index 0000000..3f209ce
--- /dev/null
+++ b/libminifi/src/PutFile.cpp
@@ -0,0 +1,200 @@
+/**
+ * @file PutFile.cpp
+ * PutFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <fstream>
+#include <uuid/uuid.h>
+
+#include "TimeUtil.h"
+#include "PutFile.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE("replace");
+const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_IGNORE("ignore");
+const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL("fail");
+
+const std::string PutFile::ProcessorName("PutFile");
+
+Property PutFile::Directory("Output Directory", "The output directory to which to put files", ".");
+Property PutFile::ConflictResolution("Conflict Resolution Strategy", "Indicates what should happen when a file with the same name already exists in the output directory", CONFLICT_RESOLUTION_STRATEGY_FAIL);
+
+Relationship PutFile::Success("success", "All files are routed to success");
+Relationship PutFile::Failure("failure", "Failed files (conflict, write failure, etc.) are transferred to failure");
+
+void PutFile::initialize()
+{
+ //! Set the supported properties
+ std::set<Property> properties;
+ properties.insert(Directory);
+ properties.insert(ConflictResolution);
+ setSupportedProperties(properties);
+ //! Set the supported relationships
+ std::set<Relationship> relationships;
+ relationships.insert(Success);
+ relationships.insert(Failure);
+ setSupportedRelationships(relationships);
+}
+
+void PutFile::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+ std::string directory;
+
+ if (!context->getProperty(Directory.getName(), directory))
+ {
+ _logger->log_error("Directory attribute is missing or invalid");
+ return;
+ }
+
+ std::string conflictResolution;
+
+ if (!context->getProperty(ConflictResolution.getName(), conflictResolution))
+ {
+ _logger->log_error("Conflict Resolution Strategy attribute is missing or invalid");
+ return;
+ }
+
+ FlowFileRecord *flowFile = session->get();
+
+ // Do nothing if there are no incoming files
+ if (!flowFile)
+ {
+ return;
+ }
+
+ std::string filename;
+ flowFile->getAttribute(FILENAME, filename);
+
+ // Generate a safe (universally-unique) temporary filename on the same partition
+ char tmpFileUuidStr[37];
+ uuid_t tmpFileUuid;
+ uuid_generate(tmpFileUuid);
+ uuid_unparse(tmpFileUuid, tmpFileUuidStr);
+ std::stringstream tmpFileSs;
+ tmpFileSs << directory << "/." << filename << "." << tmpFileUuidStr;
+ std::string tmpFile = tmpFileSs.str();
+ _logger->log_info("PutFile using temporary file %s", tmpFile.c_str());
+
+ // Determine dest full file paths
+ std::stringstream destFileSs;
+ destFileSs << directory << "/" << filename;
+ std::string destFile = destFileSs.str();
+
+ _logger->log_info("PutFile writing file %s into directory %s", filename.c_str(), directory.c_str());
+
+ // If file exists, apply conflict resolution strategy
+ struct stat statResult;
+
+ if (stat(destFile.c_str(), &statResult) == 0)
+ {
+ _logger->log_info("Destination file %s exists; applying Conflict Resolution Strategy: %s", destFile.c_str(), conflictResolution.c_str());
+
+ if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_REPLACE)
+ {
+ putFile(session, flowFile, tmpFile, destFile);
+ }
+ else if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_IGNORE)
+ {
+ session->transfer(flowFile, Success);
+ }
+ else
+ {
+ session->transfer(flowFile, Failure);
+ }
+ }
+ else
+ {
+ putFile(session, flowFile, tmpFile, destFile);
+ }
+}
+
+bool PutFile::putFile(ProcessSession *session, FlowFileRecord *flowFile, const std::string &tmpFile, const std::string &destFile)
+{
+
+ ReadCallback cb(tmpFile, destFile);
+ session->read(flowFile, &cb);
+
+ if (cb.commit())
+ {
+ session->transfer(flowFile, Success);
+ }
+ else
+ {
+ session->transfer(flowFile, Failure);
+ }
+}
+
+PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const std::string &destFile)
+: _tmpFile(tmpFile)
+, _tmpFileOs(tmpFile)
+, _destFile(destFile)
+{
+ _logger = Logger::getLogger();
+}
+
+// Copy the entire file contents to the temporary file
+void PutFile::ReadCallback::process(std::ifstream *stream)
+{
+ // Copy file contents into tmp file
+ _writeSucceeded = false;
+ _tmpFileOs << stream->rdbuf();
+ _writeSucceeded = true;
+}
+
+// Renames tmp file to final destination
+// Returns true if commit succeeded
+bool PutFile::ReadCallback::commit()
+{
+ bool success = false;
+
+ _logger->log_info("PutFile committing put file operation to %s", _destFile.c_str());
+
+ if (_writeSucceeded)
+ {
+ _tmpFileOs.close();
+
+ if (rename(_tmpFile.c_str(), _destFile.c_str()))
+ {
+ _logger->log_info("PutFile commit put file operation to %s failed because rename() call failed", _destFile.c_str());
+ }
+ else
+ {
+ success = true;
+ _logger->log_info("PutFile commit put file operation to %s succeeded", _destFile.c_str());
+ }
+ }
+ else
+ {
+ _logger->log_error("PutFile commit put file operation to %s failed because write failed", _destFile.c_str());
+ }
+
+ return success;
+}
+
+// Clean up resources
+PutFile::ReadCallback::~ReadCallback() {
+ // Close tmp file
+ _tmpFileOs.close();
+
+ // Clean up tmp file, if necessary
+ unlink(_tmpFile.c_str());
+}