You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/13 03:07:41 UTC
nifi git commit: NIFI-1109 Updated documentation and cleaned up code
Repository: nifi
Updated Branches:
refs/heads/master 02102ea1c -> 65bd8c0b1
NIFI-1109 Updated documentation and cleaned up code
Reviewed by Bryan Bende (bbende@apache.org).
Committed with amendments (for whitespace, a couple misspellings and to change a test method name based on review) by Tony Kurc (tkurc@apache.org)
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/65bd8c0b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/65bd8c0b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/65bd8c0b
Branch: refs/heads/master
Commit: 65bd8c0b1f0a32a43e4f3276bc8e606c12913bd2
Parents: 02102ea
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Nov 12 21:01:50 2015 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Thu Nov 12 21:07:01 2015 -0500
----------------------------------------------------------------------
.../couchbase/AbstractCouchbaseProcessor.java | 109 +++++++++---------
.../couchbase/CouchbaseExceptionMappings.java | 2 +-
.../processors/couchbase/GetCouchbaseKey.java | 114 +++++++++++--------
.../processors/couchbase/PutCouchbaseKey.java | 76 ++++++-------
.../couchbase/TestGetCouchbaseKey.java | 17 +--
5 files changed, 168 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/65bd8c0b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
index b879041..158caa1 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
@@ -43,21 +43,18 @@ import com.couchbase.client.java.Bucket;
*/
public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
- public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor
- .Builder().name("Document Type")
- .description("The type of contents.")
- .required(true)
- .allowableValues(DocumentType.values())
- .defaultValue(DocumentType.Json.toString())
- .build();
-
- public static final PropertyDescriptor DOC_ID = new PropertyDescriptor
- .Builder().name("Document Id")
- .description("A static, fixed Couchbase document id."
- + "Or an expression to construct the Couchbase document id.")
- .expressionLanguageSupported(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor.Builder().name("Document Type")
+ .description("The type of contents.")
+ .required(true)
+ .allowableValues(DocumentType.values())
+ .defaultValue(DocumentType.Json.toString())
+ .build();
+
+ public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder().name("Document Id")
+ .description("A static, fixed Couchbase document id, or an expression to construct the Couchbase document id.")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -77,15 +74,13 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
.description("All FlowFiles that cannot written to Couchbase Server and can't be retried are routed to this relationship.")
.build();
- public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor
- .Builder().name("Couchbase Cluster Controller Service")
+ public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor.Builder().name("Couchbase Cluster Controller Service")
.description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.")
.required(true)
.identifiesControllerService(CouchbaseClusterControllerService.class)
.build();
- public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor
- .Builder().name("Bucket Name")
+ public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder().name("Bucket Name")
.description("The name of bucket to access.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -115,6 +110,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/**
* Add processor specific properties.
+ *
* @param descriptors add properties to this list
*/
protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
@@ -123,6 +119,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/**
* Add processor specific relationships.
+ *
* @param relationships add relationships to this list
*/
protected void addSupportedRelationships(Set<Relationship> relationships) {
@@ -140,11 +137,11 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
}
private CouchbaseClusterControllerService getClusterService(final ProcessContext context) {
- if(clusterService == null){
- synchronized(AbstractCouchbaseProcessor.class){
- if(clusterService == null){
+ if (clusterService == null) {
+ synchronized (AbstractCouchbaseProcessor.class) {
+ if (clusterService == null) {
clusterService = context.getProperty(COUCHBASE_CLUSTER_SERVICE)
- .asControllerService(CouchbaseClusterControllerService.class);
+ .asControllerService(CouchbaseClusterControllerService.class);
}
}
}
@@ -154,6 +151,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/**
* Open a bucket connection using a CouchbaseClusterControllerService.
+ *
* @param context a process context
* @return a bucket instance
*/
@@ -163,18 +161,17 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/**
* Generate a transit url.
+ *
* @param context a process context
* @return a transit url based on the bucket name and the CouchbaseClusterControllerService name
*/
- protected String getTransitUrl(final ProcessContext context) {
- return new StringBuilder(context.getProperty(BUCKET_NAME).getValue())
- .append('@')
- .append(context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue())
- .toString();
+ protected String getTransitUrl(final ProcessContext context, final String docId) {
+ return "couchbase://" + context.getProperty(BUCKET_NAME).getValue() + "/" + docId;
}
/**
- * Handles the thrown CocuhbaseException accordingly.
+ * Handles the thrown CouchbaseException accordingly.
+ *
* @param context a process context
* @param session a process session
* @param logger a logger
@@ -183,35 +180,39 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
* @param errMsg a message to be logged
*/
protected void handleCouchbaseException(final ProcessContext context, final ProcessSession session,
- final ProcessorLog logger, FlowFile inFile, CouchbaseException e,
- String errMsg) {
+ final ProcessorLog logger, FlowFile inFile, CouchbaseException e,
+ String errMsg) {
logger.error(errMsg, e);
- if(inFile != null){
+ if (inFile != null) {
ErrorHandlingStrategy strategy = CouchbaseExceptionMappings.getStrategy(e);
- switch(strategy.penalty()) {
- case Penalize:
- if(logger.isDebugEnabled()) logger.debug("Penalized: {}", new Object[]{inFile});
- inFile = session.penalize(inFile);
- break;
- case Yield:
- if(logger.isDebugEnabled()) logger.debug("Yielded context: {}", new Object[]{inFile});
- context.yield();
- break;
- case None:
- break;
+ switch (strategy.penalty()) {
+ case Penalize:
+ if (logger.isDebugEnabled()) {
+ logger.debug("Penalized: {}", new Object[] {inFile});
+ }
+ inFile = session.penalize(inFile);
+ break;
+ case Yield:
+ if (logger.isDebugEnabled()) {
+ logger.debug("Yielded context: {}", new Object[] {inFile});
+ }
+ context.yield();
+ break;
+ case None:
+ break;
}
- switch(strategy.result()) {
- case ProcessException:
- throw new ProcessException(errMsg, e);
- case Failure:
- inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
- session.transfer(inFile, REL_FAILURE);
- break;
- case Retry:
- inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
- session.transfer(inFile, REL_RETRY);
- break;
+ switch (strategy.result()) {
+ case ProcessException:
+ throw new ProcessException(errMsg, e);
+ case Failure:
+ inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
+ session.transfer(inFile, REL_FAILURE);
+ break;
+ case Retry:
+ inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
+ session.transfer(inFile, REL_RETRY);
+ break;
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/65bd8c0b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java
index 87ffabb..e4faba3 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java
@@ -78,7 +78,7 @@ public class CouchbaseExceptionMappings {
mapping.put(ReplicaNotConfiguredException.class, ConfigurationError);
// when a particular Service(KV, View, Query, DCP) isn't running in a cluster
mapping.put(ServiceNotAvailableException.class, ConfigurationError);
- // SSL configuration error, such as key store mis configuration.
+ // SSL configuration error, such as key store misconfiguration.
mapping.put(SSLException.class, ConfigurationError);
/*
http://git-wip-us.apache.org/repos/asf/nifi/blob/65bd8c0b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
index 4aa9677..b4ff467 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
@@ -16,18 +16,19 @@
*/
package org.apache.nifi.processors.couchbase;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.behavior.ReadsAttribute;
-import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -43,6 +44,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
import com.couchbase.client.core.CouchbaseException;
@@ -52,31 +54,30 @@ import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.RawJsonDocument;
import com.couchbase.client.java.error.DocumentDoesNotExistException;
-@Tags({ "nosql", "couchbase", "database", "get" })
-@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. This processor can be triggered by an incoming FlowFile, or it can be scheduled on a timer")
+@Tags({"nosql", "couchbase", "database", "get"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. The ID of the document to fetch may be supplied by setting the <Document Id> property. "
+ + "NOTE: if the Document Id property is not set, the contents of the FlowFile will be read to determine the Document Id, which means that the contents of the entire "
+ + "FlowFile will be buffered in memory.")
@SeeAlso({CouchbaseClusterControllerService.class})
-@ReadsAttributes({
- @ReadsAttribute(attribute = "FlowFile content", description = "Used as a document id if 'Document Id' is not specified"),
- @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.")
- })
@WritesAttributes({
- @WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was retrieved from."),
- @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was retrieved from."),
- @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."),
- @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."),
- @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."),
- @WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.")
- })
+ @WritesAttribute(attribute = "couchbase.cluster", description = "Cluster where the document was retrieved from."),
+ @WritesAttribute(attribute = "couchbase.bucket", description = "Bucket where the document was retrieved from."),
+ @WritesAttribute(attribute = "couchbase.doc.id", description = "Id of the document."),
+ @WritesAttribute(attribute = "couchbase.doc.cas", description = "CAS of the document."),
+ @WritesAttribute(attribute = "couchbase.doc.expiry", description = "Expiration of the document."),
+ @WritesAttribute(attribute = "couchbase.exception", description = "If Couchbase related error occurs the CouchbaseException class name will be captured here.")
+})
public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
@Override
- protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
+ protected void addSupportedProperties(final List<PropertyDescriptor> descriptors) {
descriptors.add(DOCUMENT_TYPE);
descriptors.add(DOC_ID);
}
@Override
- protected void addSupportedRelationships(Set<Relationship> relationships) {
+ protected void addSupportedRelationships(final Set<Relationship> relationships) {
relationships.add(REL_SUCCESS);
relationships.add(REL_ORIGINAL);
relationships.add(REL_RETRY);
@@ -85,11 +86,15 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final ProcessorLog logger = getLogger();
FlowFile inFile = session.get();
+ if (inFile == null) {
+ return;
+ }
+ final long startNanos = System.nanoTime();
+ final ProcessorLog logger = getLogger();
String docId = null;
- if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
+ if (!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())) {
docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue();
} else {
final byte[] content = new byte[(int) inFile.getSize()];
@@ -102,63 +107,74 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
docId = new String(content, StandardCharsets.UTF_8);
}
- if(StringUtils.isEmpty(docId)){
- if(inFile != null){
- throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile);
- }
+ if (StringUtils.isEmpty(docId)) {
+ throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile);
}
try {
- Document<?> doc = null;
- byte[] content = null;
- Bucket bucket = openBucket(context);
- DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
- switch (documentType){
- case Json : {
+ final Document<?> doc;
+ final byte[] content;
+ final Bucket bucket = openBucket(context);
+ final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
+
+ switch (documentType) {
+ case Json: {
RawJsonDocument document = bucket.get(docId, RawJsonDocument.class);
- if(document != null){
+ if (document == null) {
+ doc = null;
+ content = null;
+ } else {
content = document.content().getBytes(StandardCharsets.UTF_8);
doc = document;
}
break;
}
- case Binary : {
+ case Binary: {
BinaryDocument document = bucket.get(docId, BinaryDocument.class);
- if(document != null){
+ if (document == null) {
+ doc = null;
+ content = null;
+ } else {
content = document.content().array();
doc = document;
}
break;
}
+ default: {
+ doc = null;
+ content = null;
+ }
}
- if(doc == null) {
- logger.warn("Document {} was not found in {}", new Object[]{docId, getTransitUrl(context)});
- if(inFile != null){
- inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName());
- session.transfer(inFile, REL_FAILURE);
- }
+ if (doc == null) {
+ logger.error("Document {} was not found in {}; routing {} to failure", new Object[] {docId, getTransitUrl(context, docId), inFile});
+ inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName());
+ session.transfer(inFile, REL_FAILURE);
return;
}
- if(inFile != null){
- session.transfer(inFile, REL_ORIGINAL);
- }
+ FlowFile outFile = session.create(inFile);
+ outFile = session.write(outFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException {
+ out.write(content);
+ }
+ });
- FlowFile outFile = session.create();
- outFile = session.importFrom(new ByteArrayInputStream(content), outFile);
- Map<String, String> updatedAttrs = new HashMap<>();
+ final Map<String, String> updatedAttrs = new HashMap<>();
updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue());
updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
outFile = session.putAllAttributes(outFile, updatedAttrs);
- session.getProvenanceReporter().receive(outFile, getTransitUrl(context));
- session.transfer(outFile, REL_SUCCESS);
- } catch (CouchbaseException e){
- String errMsg = String.format("Getting docuement %s from Couchbase Server using %s failed due to %s", docId, inFile, e);
+ final long fetchMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().fetch(outFile, getTransitUrl(context, docId), fetchMillis);
+ session.transfer(outFile, REL_SUCCESS);
+ session.transfer(inFile, REL_ORIGINAL);
+ } catch (final CouchbaseException e) {
+ String errMsg = String.format("Getting document %s from Couchbase Server using %s failed due to %s", docId, inFile, e);
handleCouchbaseException(context, session, logger, inFile, e, errMsg);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/65bd8c0b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
index 2aa803c..291c02c 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
@@ -54,39 +54,36 @@ import com.couchbase.client.java.document.BinaryDocument;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.RawJsonDocument;
-@Tags({ "nosql", "couchbase", "database", "put" })
+@Tags({"nosql", "couchbase", "database", "put"})
@CapabilityDescription("Put a document to Couchbase Server via Key/Value access.")
@SeeAlso({CouchbaseClusterControllerService.class})
@ReadsAttributes({
@ReadsAttribute(attribute = "uuid", description = "Used as a document id if 'Document Id' is not specified"),
- @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.")
- })
+})
@WritesAttributes({
- @WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was stored."),
- @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was stored."),
- @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."),
- @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."),
- @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."),
- @WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.")
- })
+ @WritesAttribute(attribute = "couchbase.cluster", description = "Cluster where the document was stored."),
+ @WritesAttribute(attribute = "couchbase.bucket", description = "Bucket where the document was stored."),
+ @WritesAttribute(attribute = "couchbase.doc.id", description = "Id of the document."),
+ @WritesAttribute(attribute = "couchbase.doc.cas", description = "CAS of the document."),
+ @WritesAttribute(attribute = "couchbase.doc.expiry", description = "Expiration of the document."),
+ @WritesAttribute(attribute = "couchbase.exception", description = "If Couchbase related error occurs the CouchbaseException class name will be captured here.")
+})
public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
- public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor
- .Builder().name("Persist To")
- .description("Durability constraint about disk persistence.")
- .required(true)
- .allowableValues(PersistTo.values())
- .defaultValue(PersistTo.NONE.toString())
- .build();
+ public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor.Builder().name("Persist To")
+ .description("Durability constraint about disk persistence.")
+ .required(true)
+ .allowableValues(PersistTo.values())
+ .defaultValue(PersistTo.NONE.toString())
+ .build();
- public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor
- .Builder().name("Replicate To")
- .description("Durability constraint about replication.")
- .required(true)
- .allowableValues(ReplicateTo.values())
- .defaultValue(ReplicateTo.NONE.toString())
- .build();
+ public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor.Builder().name("Replicate To")
+ .description("Durability constraint about replication.")
+ .required(true)
+ .allowableValues(ReplicateTo.values())
+ .defaultValue(ReplicateTo.NONE.toString())
+ .build();
@Override
protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
@@ -107,7 +104,7 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ProcessorLog logger = getLogger();
FlowFile flowFile = session.get();
- if ( flowFile == null ) {
+ if (flowFile == null) {
return;
}
@@ -119,41 +116,42 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
}
});
- String docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key()));
- if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
+ String docId = flowFile.getAttribute(CoreAttributes.UUID.key());
+ if (!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())) {
docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
}
try {
Document<?> doc = null;
- DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
- switch (documentType){
- case Json : {
+ final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
+ switch (documentType) {
+ case Json: {
doc = RawJsonDocument.create(docId, new String(content, StandardCharsets.UTF_8));
break;
}
- case Binary : {
- ByteBuf buf = Unpooled.copiedBuffer(content);
+ case Binary: {
+ final ByteBuf buf = Unpooled.copiedBuffer(content);
doc = BinaryDocument.create(docId, buf);
break;
}
}
- PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue());
- ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue());
+ final PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue());
+ final ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue());
doc = openBucket(context).upsert(doc, persistTo, replicateTo);
- Map<String, String> updatedAttrs = new HashMap<>();
+
+ final Map<String, String> updatedAttrs = new HashMap<>();
updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue());
updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
+
flowFile = session.putAllAttributes(flowFile, updatedAttrs);
- session.getProvenanceReporter().send(flowFile, getTransitUrl(context));
+ session.getProvenanceReporter().send(flowFile, getTransitUrl(context, docId));
session.transfer(flowFile, REL_SUCCESS);
-
- } catch (CouchbaseException e) {
- String errMsg = String.format("Writing docuement %s to Couchbase Server using %s failed due to %s", docId, flowFile, e);
+ } catch (final CouchbaseException e) {
+ String errMsg = String.format("Writing document %s to Couchbase Server using %s failed due to %s", docId, flowFile, e);
handleCouchbaseException(context, session, logger, flowFile, e, errMsg);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/65bd8c0b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
index 108980c..3776826 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
@@ -100,10 +100,11 @@ public class TestGetCouchbaseKey {
testRunner.setProperty(BUCKET_NAME, bucketName);
testRunner.setProperty(DOC_ID, docId);
+ testRunner.enqueue(new byte[0]);
testRunner.run();
- testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
+ testRunner.assertTransferCount(REL_ORIGINAL, 1);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
@@ -145,7 +146,7 @@ public class TestGetCouchbaseKey {
}
@Test
- public void testDocIdExpWithNullFlowFile() throws Exception {
+ public void testDocIdExpWithEmptyFlowFile() throws Exception {
String docIdExp = "doc-s";
String docId = "doc-s";
@@ -157,10 +158,11 @@ public class TestGetCouchbaseKey {
testRunner.setProperty(DOC_ID, docIdExp);
+ testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
- testRunner.assertTransferCount(REL_ORIGINAL, 0);
+ testRunner.assertTransferCount(REL_ORIGINAL, 1);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
@@ -179,11 +181,12 @@ public class TestGetCouchbaseKey {
setupMockBucket(bucket);
testRunner.setProperty(DOC_ID, docIdExp);
+ testRunner.enqueue(new byte[0]);
try {
testRunner.run();
fail("Exception should be thrown.");
- } catch (AssertionError e){
+ } catch (AssertionError e) {
Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class));
}
@@ -210,7 +213,7 @@ public class TestGetCouchbaseKey {
try {
testRunner.run();
fail("Exception should be thrown.");
- } catch (AssertionError e){
+ } catch (AssertionError e) {
Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class));
}
@@ -288,7 +291,7 @@ public class TestGetCouchbaseKey {
try {
testRunner.run();
fail("ProcessException should be thrown.");
- } catch (AssertionError e){
+ } catch (AssertionError e) {
Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
}
@@ -315,7 +318,7 @@ public class TestGetCouchbaseKey {
try {
testRunner.run();
fail("ProcessException should be thrown.");
- } catch (AssertionError e){
+ } catch (AssertionError e) {
Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
Assert.assertTrue(e.getCause().getCause().getClass().equals(AuthenticationException.class));
}