You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/13 03:07:41 UTC

nifi git commit: NIFI-1109 Updated documentation and cleaned up code

Repository: nifi
Updated Branches:
  refs/heads/master 02102ea1c -> 65bd8c0b1


NIFI-1109 Updated documentation and cleaned up code

Reviewed by Bryan Bende (bbende@apache.org).
Committed with amendments (for whitespace, a couple misspellings and to change a test method name based on review) by Tony Kurc (tkurc@apache.org)


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/65bd8c0b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/65bd8c0b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/65bd8c0b

Branch: refs/heads/master
Commit: 65bd8c0b1f0a32a43e4f3276bc8e606c12913bd2
Parents: 02102ea
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Nov 12 21:01:50 2015 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Thu Nov 12 21:07:01 2015 -0500

----------------------------------------------------------------------
 .../couchbase/AbstractCouchbaseProcessor.java   | 109 +++++++++---------
 .../couchbase/CouchbaseExceptionMappings.java   |   2 +-
 .../processors/couchbase/GetCouchbaseKey.java   | 114 +++++++++++--------
 .../processors/couchbase/PutCouchbaseKey.java   |  76 ++++++-------
 .../couchbase/TestGetCouchbaseKey.java          |  17 +--
 5 files changed, 168 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/65bd8c0b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
index b879041..158caa1 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
@@ -43,21 +43,18 @@ import com.couchbase.client.java.Bucket;
  */
 public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
 
-    public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor
-            .Builder().name("Document Type")
-            .description("The type of contents.")
-            .required(true)
-            .allowableValues(DocumentType.values())
-            .defaultValue(DocumentType.Json.toString())
-            .build();
-
-    public static final PropertyDescriptor DOC_ID = new PropertyDescriptor
-            .Builder().name("Document Id")
-            .description("A static, fixed Couchbase document id."
-                    + "Or an expression to construct the Couchbase document id.")
-            .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
+    public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor.Builder().name("Document Type")
+        .description("The type of contents.")
+        .required(true)
+        .allowableValues(DocumentType.values())
+        .defaultValue(DocumentType.Json.toString())
+        .build();
+
+    public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder().name("Document Id")
+        .description("A static, fixed Couchbase document id, or an expression to construct the Couchbase document id.")
+        .expressionLanguageSupported(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
 
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -77,15 +74,13 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
         .description("All FlowFiles that cannot written to Couchbase Server and can't be retried are routed to this relationship.")
         .build();
 
-    public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor
-        .Builder().name("Couchbase Cluster Controller Service")
+    public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor.Builder().name("Couchbase Cluster Controller Service")
         .description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.")
         .required(true)
         .identifiesControllerService(CouchbaseClusterControllerService.class)
         .build();
 
-    public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor
-        .Builder().name("Bucket Name")
+    public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder().name("Bucket Name")
         .description("The name of bucket to access.")
         .required(true)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -115,6 +110,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
 
     /**
      * Add processor specific properties.
+     *
      * @param descriptors add properties to this list
      */
     protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
@@ -123,6 +119,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
 
     /**
      * Add processor specific relationships.
+     *
      * @param relationships add relationships to this list
      */
     protected void addSupportedRelationships(Set<Relationship> relationships) {
@@ -140,11 +137,11 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
     }
 
     private CouchbaseClusterControllerService getClusterService(final ProcessContext context) {
-        if(clusterService == null){
-            synchronized(AbstractCouchbaseProcessor.class){
-                if(clusterService == null){
+        if (clusterService == null) {
+            synchronized (AbstractCouchbaseProcessor.class) {
+                if (clusterService == null) {
                     clusterService = context.getProperty(COUCHBASE_CLUSTER_SERVICE)
-                            .asControllerService(CouchbaseClusterControllerService.class);
+                        .asControllerService(CouchbaseClusterControllerService.class);
                 }
             }
         }
@@ -154,6 +151,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
 
     /**
      * Open a bucket connection using a CouchbaseClusterControllerService.
+     *
      * @param context a process context
      * @return a bucket instance
      */
@@ -163,18 +161,17 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
 
     /**
      * Generate a transit url.
+     *
      * @param context a process context
      * @return a transit url based on the bucket name and the CouchbaseClusterControllerService name
      */
-    protected String getTransitUrl(final ProcessContext context) {
-        return new StringBuilder(context.getProperty(BUCKET_NAME).getValue())
-            .append('@')
-            .append(context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue())
-            .toString();
+    protected String getTransitUrl(final ProcessContext context, final String docId) {
+        return "couchbase://" + context.getProperty(BUCKET_NAME).getValue() + "/" + docId;
     }
 
     /**
-     * Handles the thrown CocuhbaseException accordingly.
+     * Handles the thrown CouchbaseException accordingly.
+     *
      * @param context a process context
      * @param session a process session
      * @param logger a logger
@@ -183,35 +180,39 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
      * @param errMsg a message to be logged
      */
     protected void handleCouchbaseException(final ProcessContext context, final ProcessSession session,
-            final ProcessorLog logger, FlowFile inFile, CouchbaseException e,
-            String errMsg) {
+        final ProcessorLog logger, FlowFile inFile, CouchbaseException e,
+        String errMsg) {
         logger.error(errMsg, e);
-        if(inFile != null){
+        if (inFile != null) {
             ErrorHandlingStrategy strategy = CouchbaseExceptionMappings.getStrategy(e);
-            switch(strategy.penalty()) {
-            case Penalize:
-                if(logger.isDebugEnabled()) logger.debug("Penalized: {}", new Object[]{inFile});
-                inFile = session.penalize(inFile);
-                break;
-            case Yield:
-                if(logger.isDebugEnabled()) logger.debug("Yielded context: {}", new Object[]{inFile});
-                context.yield();
-                break;
-            case None:
-                break;
+            switch (strategy.penalty()) {
+                case Penalize:
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Penalized: {}", new Object[] {inFile});
+                    }
+                    inFile = session.penalize(inFile);
+                    break;
+                case Yield:
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Yielded context: {}", new Object[] {inFile});
+                    }
+                    context.yield();
+                    break;
+                case None:
+                    break;
             }
 
-            switch(strategy.result()) {
-            case ProcessException:
-                throw new ProcessException(errMsg, e);
-            case Failure:
-                inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
-                session.transfer(inFile, REL_FAILURE);
-                break;
-            case Retry:
-                inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
-                session.transfer(inFile, REL_RETRY);
-                break;
+            switch (strategy.result()) {
+                case ProcessException:
+                    throw new ProcessException(errMsg, e);
+                case Failure:
+                    inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
+                    session.transfer(inFile, REL_FAILURE);
+                    break;
+                case Retry:
+                    inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
+                    session.transfer(inFile, REL_RETRY);
+                    break;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/65bd8c0b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java
index 87ffabb..e4faba3 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java
@@ -78,7 +78,7 @@ public class CouchbaseExceptionMappings {
         mapping.put(ReplicaNotConfiguredException.class, ConfigurationError);
         // when a particular Service(KV, View, Query, DCP) isn't running in a cluster
         mapping.put(ServiceNotAvailableException.class, ConfigurationError);
-        // SSL configuration error, such as key store mis configuration.
+        // SSL configuration error, such as key store misconfiguration.
         mapping.put(SSLException.class, ConfigurationError);
 
         /*

http://git-wip-us.apache.org/repos/asf/nifi/blob/65bd8c0b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
index 4aa9677..b4ff467 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
@@ -16,18 +16,19 @@
  */
 package org.apache.nifi.processors.couchbase;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.behavior.ReadsAttribute;
-import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -43,6 +44,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.stream.io.StreamUtils;
 
 import com.couchbase.client.core.CouchbaseException;
@@ -52,31 +54,30 @@ import com.couchbase.client.java.document.Document;
 import com.couchbase.client.java.document.RawJsonDocument;
 import com.couchbase.client.java.error.DocumentDoesNotExistException;
 
-@Tags({ "nosql", "couchbase", "database", "get" })
-@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. This processor can be triggered by an incoming FlowFile, or it can be scheduled on a timer")
+@Tags({"nosql", "couchbase", "database", "get"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. The ID of the document to fetch may be supplied by setting the <Document Id> property. "
+    + "NOTE: if the Document Id property is not set, the contents of the FlowFile will be read to determine the Document Id, which means that the contents of the entire "
+    + "FlowFile will be buffered in memory.")
 @SeeAlso({CouchbaseClusterControllerService.class})
-@ReadsAttributes({
-    @ReadsAttribute(attribute = "FlowFile content", description = "Used as a document id if 'Document Id' is not specified"),
-    @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.")
-    })
 @WritesAttributes({
-    @WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was retrieved from."),
-    @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was retrieved from."),
-    @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."),
-    @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."),
-    @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."),
-    @WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.")
-    })
+    @WritesAttribute(attribute = "couchbase.cluster", description = "Cluster where the document was retrieved from."),
+    @WritesAttribute(attribute = "couchbase.bucket", description = "Bucket where the document was retrieved from."),
+    @WritesAttribute(attribute = "couchbase.doc.id", description = "Id of the document."),
+    @WritesAttribute(attribute = "couchbase.doc.cas", description = "CAS of the document."),
+    @WritesAttribute(attribute = "couchbase.doc.expiry", description = "Expiration of the document."),
+    @WritesAttribute(attribute = "couchbase.exception", description = "If Couchbase related error occurs the CouchbaseException class name will be captured here.")
+})
 public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
 
     @Override
-    protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
+    protected void addSupportedProperties(final List<PropertyDescriptor> descriptors) {
         descriptors.add(DOCUMENT_TYPE);
         descriptors.add(DOC_ID);
     }
 
     @Override
-    protected void addSupportedRelationships(Set<Relationship> relationships) {
+    protected void addSupportedRelationships(final Set<Relationship> relationships) {
         relationships.add(REL_SUCCESS);
         relationships.add(REL_ORIGINAL);
         relationships.add(REL_RETRY);
@@ -85,11 +86,15 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final ProcessorLog logger = getLogger();
         FlowFile inFile = session.get();
+        if (inFile == null) {
+            return;
+        }
 
+        final long startNanos = System.nanoTime();
+        final ProcessorLog logger = getLogger();
         String docId = null;
-        if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
+        if (!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())) {
             docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue();
         } else {
             final byte[] content = new byte[(int) inFile.getSize()];
@@ -102,63 +107,74 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
             docId = new String(content, StandardCharsets.UTF_8);
         }
 
-        if(StringUtils.isEmpty(docId)){
-            if(inFile != null){
-                throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile);
-            }
+        if (StringUtils.isEmpty(docId)) {
+            throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile);
         }
 
         try {
-            Document<?> doc = null;
-            byte[] content = null;
-            Bucket bucket = openBucket(context);
-            DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
-            switch (documentType){
-                case Json : {
+            final Document<?> doc;
+            final byte[] content;
+            final Bucket bucket = openBucket(context);
+            final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
+
+            switch (documentType) {
+                case Json: {
                     RawJsonDocument document = bucket.get(docId, RawJsonDocument.class);
-                    if(document != null){
+                    if (document == null) {
+                        doc = null;
+                        content = null;
+                    } else {
                         content = document.content().getBytes(StandardCharsets.UTF_8);
                         doc = document;
                     }
                     break;
                 }
-                case Binary : {
+                case Binary: {
                     BinaryDocument document = bucket.get(docId, BinaryDocument.class);
-                    if(document != null){
+                    if (document == null) {
+                        doc = null;
+                        content = null;
+                    } else {
                         content = document.content().array();
                         doc = document;
                     }
                     break;
                 }
+                default: {
+                    doc = null;
+                    content = null;
+                }
             }
 
-            if(doc == null) {
-                logger.warn("Document {} was not found in {}", new Object[]{docId, getTransitUrl(context)});
-                if(inFile != null){
-                    inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName());
-                    session.transfer(inFile, REL_FAILURE);
-                }
+            if (doc == null) {
+                logger.error("Document {} was not found in {}; routing {} to failure", new Object[] {docId, getTransitUrl(context, docId), inFile});
+                inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName());
+                session.transfer(inFile, REL_FAILURE);
                 return;
             }
 
-            if(inFile != null){
-                session.transfer(inFile, REL_ORIGINAL);
-            }
+            FlowFile outFile = session.create(inFile);
+            outFile = session.write(outFile, new OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream out) throws IOException {
+                    out.write(content);
+                }
+            });
 
-            FlowFile outFile = session.create();
-            outFile = session.importFrom(new ByteArrayInputStream(content), outFile);
-            Map<String, String> updatedAttrs = new HashMap<>();
+            final Map<String, String> updatedAttrs = new HashMap<>();
             updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
             updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue());
             updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
             updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
             updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
             outFile = session.putAllAttributes(outFile, updatedAttrs);
-            session.getProvenanceReporter().receive(outFile, getTransitUrl(context));
-            session.transfer(outFile, REL_SUCCESS);
 
-        } catch (CouchbaseException e){
-            String errMsg = String.format("Getting docuement %s from Couchbase Server using %s failed due to %s", docId, inFile, e);
+            final long fetchMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            session.getProvenanceReporter().fetch(outFile, getTransitUrl(context, docId), fetchMillis);
+            session.transfer(outFile, REL_SUCCESS);
+            session.transfer(inFile, REL_ORIGINAL);
+        } catch (final CouchbaseException e) {
+            String errMsg = String.format("Getting document %s from Couchbase Server using %s failed due to %s", docId, inFile, e);
             handleCouchbaseException(context, session, logger, inFile, e, errMsg);
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/65bd8c0b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
index 2aa803c..291c02c 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
@@ -54,39 +54,36 @@ import com.couchbase.client.java.document.BinaryDocument;
 import com.couchbase.client.java.document.Document;
 import com.couchbase.client.java.document.RawJsonDocument;
 
-@Tags({ "nosql", "couchbase", "database", "put" })
+@Tags({"nosql", "couchbase", "database", "put"})
 @CapabilityDescription("Put a document to Couchbase Server via Key/Value access.")
 @SeeAlso({CouchbaseClusterControllerService.class})
 @ReadsAttributes({
     @ReadsAttribute(attribute = "uuid", description = "Used as a document id if 'Document Id' is not specified"),
-    @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.")
-    })
+})
 @WritesAttributes({
-    @WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was stored."),
-    @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was stored."),
-    @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."),
-    @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."),
-    @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."),
-    @WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.")
-    })
+    @WritesAttribute(attribute = "couchbase.cluster", description = "Cluster where the document was stored."),
+    @WritesAttribute(attribute = "couchbase.bucket", description = "Bucket where the document was stored."),
+    @WritesAttribute(attribute = "couchbase.doc.id", description = "Id of the document."),
+    @WritesAttribute(attribute = "couchbase.doc.cas", description = "CAS of the document."),
+    @WritesAttribute(attribute = "couchbase.doc.expiry", description = "Expiration of the document."),
+    @WritesAttribute(attribute = "couchbase.exception", description = "If Couchbase related error occurs the CouchbaseException class name will be captured here.")
+})
 public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
 
 
-    public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor
-            .Builder().name("Persist To")
-            .description("Durability constraint about disk persistence.")
-            .required(true)
-            .allowableValues(PersistTo.values())
-            .defaultValue(PersistTo.NONE.toString())
-            .build();
+    public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor.Builder().name("Persist To")
+        .description("Durability constraint about disk persistence.")
+        .required(true)
+        .allowableValues(PersistTo.values())
+        .defaultValue(PersistTo.NONE.toString())
+        .build();
 
-    public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor
-            .Builder().name("Replicate To")
-            .description("Durability constraint about replication.")
-            .required(true)
-            .allowableValues(ReplicateTo.values())
-            .defaultValue(ReplicateTo.NONE.toString())
-            .build();
+    public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor.Builder().name("Replicate To")
+        .description("Durability constraint about replication.")
+        .required(true)
+        .allowableValues(ReplicateTo.values())
+        .defaultValue(ReplicateTo.NONE.toString())
+        .build();
 
     @Override
     protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
@@ -107,7 +104,7 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         final ProcessorLog logger = getLogger();
         FlowFile flowFile = session.get();
-        if ( flowFile == null ) {
+        if (flowFile == null) {
             return;
         }
 
@@ -119,41 +116,42 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
             }
         });
 
-        String docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key()));
-        if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
+        String docId = flowFile.getAttribute(CoreAttributes.UUID.key());
+        if (!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())) {
             docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
         }
 
         try {
             Document<?> doc = null;
-            DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
-            switch (documentType){
-                case Json : {
+            final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
+            switch (documentType) {
+                case Json: {
                     doc = RawJsonDocument.create(docId, new String(content, StandardCharsets.UTF_8));
                     break;
                 }
-                case Binary : {
-                    ByteBuf buf = Unpooled.copiedBuffer(content);
+                case Binary: {
+                    final ByteBuf buf = Unpooled.copiedBuffer(content);
                     doc = BinaryDocument.create(docId, buf);
                     break;
                 }
             }
 
-            PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue());
-            ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue());
+            final PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue());
+            final ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue());
             doc = openBucket(context).upsert(doc, persistTo, replicateTo);
-            Map<String, String> updatedAttrs = new HashMap<>();
+
+            final Map<String, String> updatedAttrs = new HashMap<>();
             updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
             updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue());
             updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
             updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
             updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
+
             flowFile = session.putAllAttributes(flowFile, updatedAttrs);
-            session.getProvenanceReporter().send(flowFile, getTransitUrl(context));
+            session.getProvenanceReporter().send(flowFile, getTransitUrl(context, docId));
             session.transfer(flowFile, REL_SUCCESS);
-
-        } catch (CouchbaseException e) {
-            String errMsg = String.format("Writing docuement %s to Couchbase Server using %s failed due to %s", docId, flowFile, e);
+        } catch (final CouchbaseException e) {
+            String errMsg = String.format("Writing document %s to Couchbase Server using %s failed due to %s", docId, flowFile, e);
             handleCouchbaseException(context, session, logger, flowFile, e, errMsg);
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/65bd8c0b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
index 108980c..3776826 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
@@ -100,10 +100,11 @@ public class TestGetCouchbaseKey {
 
         testRunner.setProperty(BUCKET_NAME, bucketName);
         testRunner.setProperty(DOC_ID, docId);
+        testRunner.enqueue(new byte[0]);
         testRunner.run();
 
-        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
         testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_ORIGINAL, 1);
         testRunner.assertTransferCount(REL_RETRY, 0);
         testRunner.assertTransferCount(REL_FAILURE, 0);
         MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
@@ -145,7 +146,7 @@ public class TestGetCouchbaseKey {
     }
 
     @Test
-    public void testDocIdExpWithNullFlowFile() throws Exception {
+    public void testDocIdExpWithEmptyFlowFile() throws Exception {
         String docIdExp = "doc-s";
         String docId = "doc-s";
 
@@ -157,10 +158,11 @@ public class TestGetCouchbaseKey {
 
         testRunner.setProperty(DOC_ID, docIdExp);
 
+        testRunner.enqueue(new byte[0]);
         testRunner.run();
 
         testRunner.assertTransferCount(REL_SUCCESS, 1);
-        testRunner.assertTransferCount(REL_ORIGINAL, 0);
+        testRunner.assertTransferCount(REL_ORIGINAL, 1);
         testRunner.assertTransferCount(REL_RETRY, 0);
         testRunner.assertTransferCount(REL_FAILURE, 0);
         MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
@@ -179,11 +181,12 @@ public class TestGetCouchbaseKey {
         setupMockBucket(bucket);
 
         testRunner.setProperty(DOC_ID, docIdExp);
+        testRunner.enqueue(new byte[0]);
 
         try {
             testRunner.run();
             fail("Exception should be thrown.");
-        } catch (AssertionError e){
+        } catch (AssertionError e) {
             Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class));
         }
 
@@ -210,7 +213,7 @@ public class TestGetCouchbaseKey {
         try {
             testRunner.run();
             fail("Exception should be thrown.");
-        } catch (AssertionError e){
+        } catch (AssertionError e) {
             Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class));
         }
 
@@ -288,7 +291,7 @@ public class TestGetCouchbaseKey {
         try {
             testRunner.run();
             fail("ProcessException should be thrown.");
-        } catch (AssertionError e){
+        } catch (AssertionError e) {
             Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
         }
 
@@ -315,7 +318,7 @@ public class TestGetCouchbaseKey {
         try {
             testRunner.run();
             fail("ProcessException should be thrown.");
-        } catch (AssertionError e){
+        } catch (AssertionError e) {
             Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
             Assert.assertTrue(e.getCause().getCause().getClass().equals(AuthenticationException.class));
         }