You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2018/06/15 13:20:52 UTC

[1/3] nifi git commit: NIFI-5054: Add Couchbase user authentication

Repository: nifi
Updated Branches:
  refs/heads/master 2277b9411 -> 52d6b9cfa


NIFI-5054: Add Couchbase user authentication


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f05c5e6e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f05c5e6e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f05c5e6e

Branch: refs/heads/master
Commit: f05c5e6ea0b48783f425ad25ecc80bacfe096314
Parents: 2277b94
Author: Koji Kawamura <ij...@apache.org>
Authored: Fri Jun 1 16:55:26 2018 +0900
Committer: Matt Gilman <ma...@gmail.com>
Committed: Fri Jun 15 08:48:46 2018 -0400

----------------------------------------------------------------------
 .../nifi/couchbase/CouchbaseClusterService.java | 89 ++++++++++++++++++--
 .../nifi-couchbase-services-api/pom.xml         |  2 +-
 2 files changed, 83 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f05c5e6e/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java
index 613b9a3..9802c2e 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.couchbase;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -28,14 +29,18 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
 
 import com.couchbase.client.core.CouchbaseException;
 import com.couchbase.client.java.Bucket;
 import com.couchbase.client.java.CouchbaseCluster;
+import org.apache.nifi.util.StringUtils;
 
 /**
  * Provides a centralized Couchbase connection and bucket passwords management.
@@ -43,14 +48,43 @@ import com.couchbase.client.java.CouchbaseCluster;
 @CapabilityDescription("Provides a centralized Couchbase connection and bucket passwords management."
         + " Bucket passwords can be specified via dynamic properties.")
 @Tags({ "nosql", "couchbase", "database", "connection" })
-@DynamicProperty(name = "Bucket Password for BUCKET_NAME", value = "bucket password", description = "Specify bucket password if necessary.")
+@DynamicProperty(name = "Bucket Password for BUCKET_NAME", value = "bucket password",
+        description = "Specify bucket password if necessary." +
+                " Couchbase Server 5.0 or later should use 'User Name' and 'User Password' instead.")
 public class CouchbaseClusterService extends AbstractControllerService implements CouchbaseClusterControllerService {
 
     public static final PropertyDescriptor CONNECTION_STRING = new PropertyDescriptor
-            .Builder().name("Connection String")
+            .Builder()
+            .name("Connection String")
             .description("The hostnames or ip addresses of the bootstraping nodes and optional parameters."
                     + " Syntax) couchbase://node1,node2,nodeN?param1=value1&param2=value2&paramN=valueN")
             .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor USER_NAME = new PropertyDescriptor
+            .Builder()
+            .name("user-name")
+            .displayName("User Name")
+            .description("The user name to authenticate NiFi as a Couchbase client." +
+                    " This configuration can be used against Couchbase Server 5.0 or later" +
+                    " supporting Roll-Based Access Control.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor USER_PASSWORD = new PropertyDescriptor
+            .Builder()
+            .name("user-password")
+            .displayName("User Password")
+            .description("The user password to authenticate NiFi as a Couchbase client." +
+                    " This configuration can be used against Couchbase Server 5.0 or later" +
+                    " supporting Roll-Based Access Control.")
+            .required(false)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
@@ -59,13 +93,15 @@ public class CouchbaseClusterService extends AbstractControllerService implement
     static {
         final List<PropertyDescriptor> props = new ArrayList<>();
         props.add(CONNECTION_STRING);
+        props.add(USER_NAME);
+        props.add(USER_PASSWORD);
 
         properties = Collections.unmodifiableList(props);
     }
 
     private static final String DYNAMIC_PROP_BUCKET_PASSWORD = "Bucket Password for ";
-    private static final Map<String, String> bucketPasswords = new HashMap<>();
 
+    private Map<String, String> bucketPasswords;
     private volatile CouchbaseCluster cluster;
 
     @Override
@@ -76,18 +112,45 @@ public class CouchbaseClusterService extends AbstractControllerService implement
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(
             String propertyDescriptorName) {
-        if(propertyDescriptorName.startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)){
+        if (propertyDescriptorName.startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)) {
             return new PropertyDescriptor
                     .Builder().name(propertyDescriptorName)
                     .description("Bucket password.")
                     .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
                     .dynamic(true)
                     .sensitive(true)
+                    .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
                     .build();
         }
         return null;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        final Collection<ValidationResult> results = new ArrayList<>();
+
+        final boolean isUserNameSet = context.getProperty(USER_NAME).isSet();
+        final boolean isUserPasswordSet = context.getProperty(USER_PASSWORD).isSet();
+        if ((isUserNameSet && !isUserPasswordSet) || (!isUserNameSet && isUserPasswordSet)) {
+            results.add(new ValidationResult.Builder()
+                    .subject("User Name and Password")
+                    .explanation("Both User Name and Password are required to use.")
+                    .build());
+        }
+
+        final boolean isBucketPasswordSet = context.getProperties().keySet().stream()
+                .anyMatch(p -> p.isDynamic() && p.getName().startsWith(DYNAMIC_PROP_BUCKET_PASSWORD));
+
+        if (isUserNameSet && isUserPasswordSet && isBucketPasswordSet) {
+            results.add(new ValidationResult.Builder()
+                    .subject("Authentication methods")
+                    .explanation("Different authentication methods can not be used at the same time," +
+                            " Use either one of User Name and Password, or Bucket Password.")
+                    .build());
+        }
+
+        return results;
+    }
 
     /**
      * Establish a connection to a Couchbase cluster.
@@ -97,15 +160,23 @@ public class CouchbaseClusterService extends AbstractControllerService implement
     @OnEnabled
     public void onConfigured(final ConfigurationContext context) throws InitializationException {
 
+        bucketPasswords = new HashMap<>();
         for(PropertyDescriptor p : context.getProperties().keySet()){
             if(p.isDynamic() && p.getName().startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)){
                 String bucketName = p.getName().substring(DYNAMIC_PROP_BUCKET_PASSWORD.length());
-                String password = context.getProperty(p).getValue();
+                String password = context.getProperty(p).evaluateAttributeExpressions().getValue();
                 bucketPasswords.put(bucketName, password);
             }
         }
+
+        final String userName = context.getProperty(USER_NAME).evaluateAttributeExpressions().getValue();
+        final String userPassword = context.getProperty(USER_PASSWORD).evaluateAttributeExpressions().getValue();
+
         try {
-            cluster = CouchbaseCluster.fromConnectionString(context.getProperty(CONNECTION_STRING).getValue());
+            cluster = CouchbaseCluster.fromConnectionString(context.getProperty(CONNECTION_STRING).evaluateAttributeExpressions().getValue());
+            if (!StringUtils.isEmpty(userName) && !StringUtils.isEmpty(userPassword)) {
+                cluster.authenticate(userName, userPassword);
+            }
         } catch(CouchbaseException e) {
             throw new InitializationException(e);
         }
@@ -113,7 +184,11 @@ public class CouchbaseClusterService extends AbstractControllerService implement
 
     @Override
     public Bucket openBucket(String bucketName){
-        return cluster.openBucket(bucketName, bucketPasswords.get(bucketName));
+        if (bucketPasswords.containsKey(bucketName)) {
+            return cluster.openBucket(bucketName, bucketPasswords.get(bucketName));
+        }
+
+        return cluster.openBucket(bucketName);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/f05c5e6e/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/pom.xml b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/pom.xml
index a6340cf..0f60a8a 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/pom.xml
@@ -33,7 +33,7 @@
         <dependency>
             <groupId>com.couchbase.client</groupId>
             <artifactId>java-client</artifactId>
-            <version>2.2.0</version>
+            <version>2.5.8</version>
         </dependency>
     </dependencies>
 </project>


[2/3] nifi git commit: NIFI-5257: Expand Couchbase Server integration

Posted by mc...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
index 154be53..34271a7 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
@@ -16,36 +16,6 @@
  */
 package org.apache.nifi.processors.couchbase;
 
-import static org.apache.nifi.couchbase.CouchbaseAttributes.Exception;
-import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.BUCKET_NAME;
-import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.COUCHBASE_CLUSTER_SERVICE;
-import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOCUMENT_TYPE;
-import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID;
-import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE;
-import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_ORIGINAL;
-import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_RETRY;
-import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException;
-import org.apache.nifi.couchbase.CouchbaseAttributes;
-import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
 import com.couchbase.client.core.BackpressureException;
 import com.couchbase.client.core.CouchbaseException;
 import com.couchbase.client.core.ServiceNotAvailableException;
@@ -59,6 +29,38 @@ import com.couchbase.client.java.document.RawJsonDocument;
 import com.couchbase.client.java.error.DocumentDoesNotExistException;
 import com.couchbase.client.java.error.DurabilityException;
 import com.couchbase.client.java.error.RequestTooBigException;
+import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException;
+import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
+import org.apache.nifi.couchbase.DocumentType;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME;
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE;
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.DOCUMENT_TYPE;
+import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID;
+import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE;
+import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_ORIGINAL;
+import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_RETRY;
+import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS;
+import static org.apache.nifi.processors.couchbase.CouchbaseAttributes.Exception;
+import static org.apache.nifi.processors.couchbase.GetCouchbaseKey.PUT_VALUE_TO_ATTRIBUTE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 public class TestGetCouchbaseKey {
@@ -80,6 +82,7 @@ public class TestGetCouchbaseKey {
         CouchbaseClusterControllerService service = mock(CouchbaseClusterControllerService.class);
         when(service.getIdentifier()).thenReturn(SERVICE_ID);
         when(service.openBucket(anyString())).thenReturn(bucket);
+        when(bucket.name()).thenReturn("bucket-1");
         testRunner.addControllerService(SERVICE_ID, service);
         testRunner.enableControllerService(service);
         testRunner.setProperty(COUCHBASE_CLUSTER_SERVICE, SERVICE_ID);
@@ -248,6 +251,57 @@ public class TestGetCouchbaseKey {
     }
 
     @Test
+    public void testPutToAttribute() throws Exception {
+
+        Bucket bucket = mock(Bucket.class);
+        String inFileDataStr = "doc-in";
+        String content = "some-value";
+        when(bucket.get(inFileDataStr, RawJsonDocument.class))
+            .thenReturn(RawJsonDocument.create(inFileDataStr, content));
+        setupMockBucket(bucket);
+
+        byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8);
+        testRunner.setProperty(PUT_VALUE_TO_ATTRIBUTE, "targetAttribute");
+        testRunner.enqueue(inFileData);
+        testRunner.run();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        // Result is put to Attribute, so no need to pass it to original.
+        testRunner.assertTransferCount(REL_ORIGINAL, 0);
+        testRunner.assertTransferCount(REL_RETRY, 0);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        outFile.assertContentEquals(inFileDataStr);
+        outFile.assertAttributeEquals("targetAttribute", content);
+
+        assertEquals(1, testRunner.getProvenanceEvents().size());
+        assertEquals(ProvenanceEventType.FETCH, testRunner.getProvenanceEvents().get(0).getEventType());
+    }
+
+    @Test
+    public void testPutToAttributeNoTargetAttribute() throws Exception {
+
+        Bucket bucket = mock(Bucket.class);
+        String inFileDataStr = "doc-in";
+        String content = "some-value";
+        when(bucket.get(inFileDataStr, RawJsonDocument.class))
+            .thenReturn(RawJsonDocument.create(inFileDataStr, content));
+        setupMockBucket(bucket);
+
+        byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8);
+        testRunner.setProperty(PUT_VALUE_TO_ATTRIBUTE, "${expressionReturningNoValue}");
+        testRunner.enqueue(inFileData);
+        testRunner.run();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 0);
+        testRunner.assertTransferCount(REL_ORIGINAL, 0);
+        testRunner.assertTransferCount(REL_RETRY, 0);
+        testRunner.assertTransferCount(REL_FAILURE, 1);
+        MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0);
+        outFile.assertContentEquals(inFileDataStr);
+    }
+
+    @Test
     public void testBinaryDocument() throws Exception {
 
         Bucket bucket = mock(Bucket.class);
@@ -274,6 +328,32 @@ public class TestGetCouchbaseKey {
         orgFile.assertContentEquals(inFileDataStr);
     }
 
+    @Test
+    public void testBinaryDocumentToAttribute() throws Exception {
+
+        Bucket bucket = mock(Bucket.class);
+        String inFileDataStr = "doc-in";
+        String content = "binary";
+        ByteBuf buf = Unpooled.copiedBuffer(content.getBytes(StandardCharsets.UTF_8));
+        when(bucket.get(inFileDataStr, BinaryDocument.class))
+            .thenReturn(BinaryDocument.create(inFileDataStr, buf));
+        setupMockBucket(bucket);
+
+        byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8);
+        testRunner.enqueue(inFileData);
+        testRunner.setProperty(DOCUMENT_TYPE, DocumentType.Binary.toString());
+        testRunner.setProperty(PUT_VALUE_TO_ATTRIBUTE, "targetAttribute");
+        testRunner.run();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_ORIGINAL, 0);
+        testRunner.assertTransferCount(REL_RETRY, 0);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        outFile.assertContentEquals(inFileDataStr);
+        outFile.assertAttributeEquals("targetAttribute", "binary");
+    }
+
 
     @Test
     public void testCouchbaseFailure() throws Exception {

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
index 1599bf4..ce9baa7 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
@@ -16,9 +16,10 @@
  */
 package org.apache.nifi.processors.couchbase;
 
-import static org.apache.nifi.couchbase.CouchbaseAttributes.Exception;
-import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.BUCKET_NAME;
-import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.COUCHBASE_CLUSTER_SERVICE;
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME;
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE;
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.DOCUMENT_TYPE;
+import static org.apache.nifi.processors.couchbase.CouchbaseAttributes.Exception;
 import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID;
 import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE;
 import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_RETRY;
@@ -37,9 +38,11 @@ import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.couchbase.client.deps.io.netty.buffer.Unpooled;
+import com.couchbase.client.java.document.BinaryDocument;
 import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException;
-import org.apache.nifi.couchbase.CouchbaseAttributes;
 import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
+import org.apache.nifi.couchbase.DocumentType;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.reporting.InitializationException;
@@ -79,6 +82,7 @@ public class TestPutCouchbaseKey {
         CouchbaseClusterControllerService service = mock(CouchbaseClusterControllerService.class);
         when(service.getIdentifier()).thenReturn(SERVICE_ID);
         when(service.openBucket(anyString())).thenReturn(bucket);
+        when(bucket.name()).thenReturn("bucket-1");
         testRunner.addControllerService(SERVICE_ID, service);
         testRunner.enableControllerService(service);
         testRunner.setProperty(COUCHBASE_CLUSTER_SERVICE, SERVICE_ID);
@@ -120,6 +124,42 @@ public class TestPutCouchbaseKey {
     }
 
     @Test
+    public void testBinaryDoc() throws Exception {
+        String bucketName = "bucket-1";
+        String docId = "doc-a";
+        int expiry = 100;
+        long cas = 200L;
+
+        String inFileData = "12345";
+        byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
+
+        Bucket bucket = mock(Bucket.class);
+        when(bucket.upsert(any(BinaryDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)))
+                .thenReturn(BinaryDocument.create(docId, expiry, Unpooled.copiedBuffer(inFileData.getBytes(StandardCharsets.UTF_8)), cas));
+        setupMockBucket(bucket);
+
+        testRunner.enqueue(inFileDataBytes);
+        testRunner.setProperty(BUCKET_NAME, bucketName);
+        testRunner.setProperty(DOC_ID, docId);
+        testRunner.setProperty(DOCUMENT_TYPE, DocumentType.Binary.name());
+        testRunner.run();
+
+        verify(bucket, times(1)).upsert(any(BinaryDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_RETRY, 0);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        outFile.assertContentEquals(inFileData);
+        outFile.assertAttributeEquals(CouchbaseAttributes.Cluster.key(), SERVICE_ID);
+        outFile.assertAttributeEquals(CouchbaseAttributes.Bucket.key(), bucketName);
+        outFile.assertAttributeEquals(CouchbaseAttributes.DocId.key(), docId);
+        outFile.assertAttributeEquals(CouchbaseAttributes.Cas.key(), String.valueOf(cas));
+        outFile.assertAttributeEquals(CouchbaseAttributes.Expiry.key(), String.valueOf(expiry));
+    }
+
+    @Test
     public void testDurabilityConstraint() throws Exception {
         String docId = "doc-a";
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api-nar/pom.xml
index d74500f..3c1a173 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api-nar/pom.xml
@@ -33,6 +33,12 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-couchbase-services-api</artifactId>
             <version>1.7.0-SNAPSHOT</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java
index 83e52be..7969c50 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java
@@ -31,6 +31,6 @@ public interface CouchbaseClusterControllerService extends ControllerService {
      * @param bucketName the bucket name to access
      * @return a connected bucket instance
      */
-    public Bucket openBucket(String bucketName);
+    Bucket openBucket(String bucketName);
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseConfigurationProperties.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseConfigurationProperties.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseConfigurationProperties.java
new file mode 100644
index 0000000..e62112e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/CouchbaseConfigurationProperties.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.couchbase;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+
+public class CouchbaseConfigurationProperties {
+
+    public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor.Builder()
+            .name("cluster-controller-service")
+            .displayName("Couchbase Cluster Controller Service")
+            .description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.")
+            .required(true)
+            .identifiesControllerService(CouchbaseClusterControllerService.class)
+            .build();
+
+    public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder()
+            .name("bucket-name")
+            .displayName("Bucket Name")
+            .description("The name of bucket to access.")
+            .required(true)
+            .addValidator(Validator.VALID)
+            .defaultValue("default")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor.Builder()
+            .name("document-type")
+            .displayName("Document Type")
+            .description("The type of contents.")
+            .required(true)
+            .allowableValues(DocumentType.values())
+            .defaultValue(DocumentType.Json.toString())
+            .build();
+
+    public static final PropertyDescriptor LOOKUP_SUB_DOC_PATH = new PropertyDescriptor.Builder()
+            .name("lookup-sub-doc-path")
+            .displayName("Lookup Sub-Document Path")
+            .description("The Sub-Document lookup path within the target JSON document.")
+            .required(false)
+            .addValidator(Validator.VALID)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/DocumentType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/DocumentType.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/DocumentType.java
new file mode 100644
index 0000000..77653cb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/couchbase/DocumentType.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.couchbase;
+
+
+/**
+ * Supported Couchbase document types.
+ *
+ * In order to handle a variety type of document classes such as JsonDocument,
+ * JsonLongDocument or JsonStringDocument, Couchbase processors use
+ * RawJsonDocument for Json type.
+ *
+ * The distinction between Json and Binary exists because BinaryDocument doesn't
+ * set Json flag when it stored on Couchbase Server even if the content byte
+ * array represents a Json string, and it can't be retrieved as a Json document.
+ */
+public enum DocumentType {
+
+    Json,
+    Binary
+
+}


[3/3] nifi git commit: NIFI-5257: Expand Couchbase Server integration

Posted by mc...@apache.org.
NIFI-5257: Expand Couchbase Server integration

- Added CouchbaseMapCacheClient.
- Added CouchbaseKeyValueLookupService.
- Added CouchbaseRecordLookupService.
- Added 'Put Value to Attribute' to GetCouchbaseKey.
- Fixed Get/PutCouchbaseKey relationship descriptions.
- Stop sharing Relationship.Builders.

This closes #2750


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/52d6b9cf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/52d6b9cf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/52d6b9cf

Branch: refs/heads/master
Commit: 52d6b9cfa218e52e037a43d94b413953d7a63c00
Parents: f05c5e6
Author: Koji Kawamura <ij...@apache.org>
Authored: Fri Jun 1 16:57:43 2018 +0900
Committer: Matt Gilman <ma...@gmail.com>
Committed: Fri Jun 15 09:14:30 2018 -0400

----------------------------------------------------------------------
 .../nifi-couchbase-processors/pom.xml           |  21 ++
 .../AbstractCouchbaseLookupService.java         |  75 +++++++
 .../nifi/couchbase/CouchbaseAttributes.java     |  63 ------
 .../CouchbaseKeyValueLookupService.java         |  83 +++++++
 .../nifi/couchbase/CouchbaseMapCacheClient.java | 221 +++++++++++++++++++
 .../couchbase/CouchbaseRecordLookupService.java | 139 ++++++++++++
 .../apache/nifi/couchbase/CouchbaseUtils.java   |  58 +++++
 .../couchbase/AbstractCouchbaseProcessor.java   |  64 ++----
 .../couchbase/CouchbaseAttributes.java          |  63 ++++++
 .../nifi/processors/couchbase/DocumentType.java |  36 ---
 .../couchbase/ErrorHandlingStrategy.java        |   6 +-
 .../processors/couchbase/GetCouchbaseKey.java   | 124 ++++++++---
 .../processors/couchbase/PutCouchbaseKey.java   |  37 ++--
 ...org.apache.nifi.controller.ControllerService |   5 +-
 .../additionalDetails.html                      |  41 ++++
 .../couchbase/TestCouchbaseClusterService.java  |  68 ++++++
 .../couchbase/TestCouchbaseMapCacheClient.java  |  73 ++++++
 .../nifi/couchbase/TestCouchbaseUtils.java      |  97 ++++++++
 .../couchbase/TestCouchbaseClusterService.java  |  58 -----
 .../couchbase/TestGetCouchbaseKey.java          | 140 +++++++++---
 .../couchbase/TestPutCouchbaseKey.java          |  48 +++-
 .../nifi-couchbase-services-api-nar/pom.xml     |   6 +
 .../CouchbaseClusterControllerService.java      |   2 +-
 .../CouchbaseConfigurationProperties.java       |  60 +++++
 .../org/apache/nifi/couchbase/DocumentType.java |  36 +++
 25 files changed, 1334 insertions(+), 290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml
index cabe98e..a5a5ab8 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml
@@ -42,12 +42,33 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
             <version>3.7</version>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-lookup-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
             <version>1.7.0-SNAPSHOT</version>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/AbstractCouchbaseLookupService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/AbstractCouchbaseLookupService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/AbstractCouchbaseLookupService.java
new file mode 100644
index 0000000..94bbdc6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/AbstractCouchbaseLookupService.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.couchbase;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.reporting.InitializationException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME;
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE;
+
+public class AbstractCouchbaseLookupService extends AbstractControllerService {
+
+    protected static final String KEY = "key";
+    protected static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet()));
+
+    protected List<PropertyDescriptor> properties;
+    protected volatile CouchbaseClusterControllerService couchbaseClusterService;
+    protected volatile String bucketName;
+
+    @Override
+    protected void init(final ControllerServiceInitializationContext context) throws InitializationException {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(COUCHBASE_CLUSTER_SERVICE);
+        properties.add(BUCKET_NAME);
+        addProperties(properties);
+        this.properties = Collections.unmodifiableList(properties);
+    }
+
+    protected void addProperties(List<PropertyDescriptor> properties) {
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws InitializationException {
+
+        couchbaseClusterService = context.getProperty(COUCHBASE_CLUSTER_SERVICE)
+                .asControllerService(CouchbaseClusterControllerService.class);
+        bucketName = context.getProperty(BUCKET_NAME).evaluateAttributeExpressions().getValue();
+    }
+
+    public Set<String> getRequiredKeys() {
+        return REQUIRED_KEYS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java
deleted file mode 100644
index 3bef8c5..0000000
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.couchbase;
-
-import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
-
-/**
- * Couchbase related attribute keys.
- */
-public enum CouchbaseAttributes implements FlowFileAttributeKey {
-
-    /**
-     * A reference to the related cluster.
-     */
-    Cluster("couchbase.cluster"),
-    /**
-     * A related bucket name.
-     */
-    Bucket("couchbase.bucket"),
-    /**
-     * The id of a related document.
-     */
-    DocId("couchbase.doc.id"),
-    /**
-     * The CAS value of a related document.
-     */
-    Cas("couchbase.doc.cas"),
-    /**
-     * The expiration of a related document.
-     */
-    Expiry("couchbase.doc.expiry"),
-    /**
-     * The thrown CouchbaseException class.
-     */
-    Exception("couchbase.exception"),
-    ;
-
-    private final String key;
-
-    private CouchbaseAttributes(final String key) {
-        this.key = key;
-    }
-
-    @Override
-    public String key() {
-        return key;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseKeyValueLookupService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseKeyValueLookupService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseKeyValueLookupService.java
new file mode 100644
index 0000000..d2640a6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseKeyValueLookupService.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.couchbase;
+
+import com.couchbase.client.core.CouchbaseException;
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.error.DocumentDoesNotExistException;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.StringLookupService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.LOOKUP_SUB_DOC_PATH;
+
+@Tags({"lookup", "enrich", "key", "value", "couchbase"})
+@CapabilityDescription("Lookup a string value from Couchbase Server associated with the specified key."
+        + " The coordinates that are passed to the lookup must contain the key 'key'.")
+public class CouchbaseKeyValueLookupService extends AbstractCouchbaseLookupService implements StringLookupService {
+
+    private volatile String subDocPath;
+
+    @Override
+    protected void addProperties(List<PropertyDescriptor> properties) {
+        properties.add(LOOKUP_SUB_DOC_PATH);
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws InitializationException {
+        super.onEnabled(context);
+        subDocPath = context.getProperty(LOOKUP_SUB_DOC_PATH).evaluateAttributeExpressions().getValue();
+    }
+
+    @Override
+    public Optional<String> lookup(Map<String, Object> coordinates) throws LookupFailureException {
+
+        try {
+            final Bucket bucket = couchbaseClusterService.openBucket(bucketName);
+            final Optional<String> docId = Optional.ofNullable(coordinates.get(KEY)).map(Object::toString);
+
+            if (!StringUtils.isBlank(subDocPath)) {
+                return docId.map(key -> {
+                    try {
+                        return bucket.lookupIn(key).get(subDocPath).execute();
+                    } catch (DocumentDoesNotExistException e) {
+                        getLogger().debug("Document was not found for {}", new Object[]{key});
+                        return null;
+                    }
+                }).map(fragment -> fragment.content(0)).map(Object::toString);
+
+            } else {
+                return docId.map(key -> CouchbaseUtils.getStringContent(bucket, key));
+            }
+        } catch (CouchbaseException e) {
+            throw new LookupFailureException("Failed to lookup from Couchbase using this coordinates: " + coordinates);
+        }
+
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseMapCacheClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseMapCacheClient.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseMapCacheClient.java
new file mode 100644
index 0000000..703f4e5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseMapCacheClient.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.couchbase;
+
+import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
+import com.couchbase.client.deps.io.netty.buffer.Unpooled;
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.document.BinaryDocument;
+import com.couchbase.client.java.document.Document;
+import com.couchbase.client.java.error.CASMismatchException;
+import com.couchbase.client.java.error.DocumentAlreadyExistsException;
+import com.couchbase.client.java.error.DocumentDoesNotExistException;
+import com.couchbase.client.java.query.Delete;
+import com.couchbase.client.java.query.N1qlQuery;
+import com.couchbase.client.java.query.N1qlQueryResult;
+import com.couchbase.client.java.query.Statement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.couchbase.client.java.query.dsl.functions.PatternMatchingFunctions.regexpContains;
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME;
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE;
+
+// TODO: Doc
+@Tags({"distributed", "cache", "map", "cluster", "couchbase"})
+@CapabilityDescription("Provides the ability to communicate with a Couchbase Server cluster as a DistributedMapCacheServer." +
+        " This can be used in order to share a Map between nodes in a NiFi cluster." +
+        " Couchbase Server cluster can provide a high available and persistent cache storage.")
+public class CouchbaseMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> {
+
+    private static final Logger logger = LoggerFactory.getLogger(CouchbaseMapCacheClient.class);
+
+    private CouchbaseClusterControllerService clusterService;
+    private Bucket bucket;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(COUCHBASE_CLUSTER_SERVICE);
+        descriptors.add(BUCKET_NAME);
+        return descriptors;
+    }
+
+    @OnEnabled
+    public void configure(final ConfigurationContext context) {
+        clusterService = context.getProperty(COUCHBASE_CLUSTER_SERVICE).asControllerService(CouchbaseClusterControllerService.class);
+        final String bucketName = context.getProperty(BUCKET_NAME).evaluateAttributeExpressions().getValue();
+        bucket = clusterService.openBucket(bucketName);
+    }
+
+    private <V> Document toDocument(String docId, V value, Serializer<V> valueSerializer) throws IOException {
+        return toDocument(docId, value, valueSerializer, 0);
+    }
+
+    private <V> Document toDocument(String docId, V value, Serializer<V> valueSerializer, long revision) throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        valueSerializer.serialize(value, bos);
+        final ByteBuf byteBuf = Unpooled.wrappedBuffer(bos.toByteArray());
+        return BinaryDocument.create(docId, byteBuf, revision);
+    }
+
+    private <K> String toDocumentId(K key, Serializer<K> keySerializer) throws IOException {
+        final String docId;
+        if (key instanceof String) {
+            docId = (String) key;
+        } else {
+            // Coerce conversion from byte[] to String, this may generate unreadable String or exceed max key size.
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            keySerializer.serialize(key, bos);
+            final byte[] keyBytes = bos.toByteArray();
+            docId = new String(keyBytes);
+
+        }
+        return docId;
+    }
+
+    @Override
+    public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+        final String docId = toDocumentId(key, keySerializer);
+        final Document doc = toDocument(docId, value, valueSerializer);
+        try {
+            bucket.insert(doc);
+            return true;
+        } catch (DocumentAlreadyExistsException e) {
+            return false;
+        }
+    }
+
+    @Override
+    public <K, V> AtomicCacheEntry<K, V, Long> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+        final String docId = toDocumentId(key, keySerializer);
+        final BinaryDocument doc = bucket.get(BinaryDocument.create(docId));
+        if (doc == null) {
+            return null;
+        }
+        final V value = deserialize(doc, valueDeserializer);
+        return new AtomicCacheEntry<>(key, value, doc.cas());
+    }
+
+    @Override
+    public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
+        final V existing = get(key, keySerializer, valueDeserializer);
+        if (existing != null) {
+            return existing;
+        }
+
+        // If there's no existing value, put this value.
+        if (!putIfAbsent(key, value, keySerializer, valueSerializer)) {
+            // If putting this value failed, it's possible that other client has put different doc, so return that.
+            return get(key, keySerializer, valueDeserializer);
+        }
+
+        // If successfully put this value, return this.
+        return value;
+    }
+
+    @Override
+    public <K, V> boolean replace(AtomicCacheEntry<K, V, Long> entry, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+        final Long revision = entry.getRevision().orElse(0L);
+        final String docId = toDocumentId(entry.getKey(), keySerializer);
+        final Document doc = toDocument(docId, entry.getValue(), valueSerializer, revision);
+        try {
+            if (revision < 0) {
+                // If the document does not exist yet, try to create one.
+                try {
+                    bucket.insert(doc);
+                    return true;
+                } catch (DocumentAlreadyExistsException e) {
+                    return false;
+                }
+            }
+            bucket.replace(doc);
+            return true;
+        } catch (DocumentDoesNotExistException|CASMismatchException e) {
+            return false;
+        }
+    }
+
+    @Override
+    public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
+        return bucket.exists(toDocumentId(key, keySerializer));
+    }
+
+    @Override
+    public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+        final String docId = toDocumentId(key, keySerializer);
+        final Document doc = toDocument(docId, value, valueSerializer);
+        bucket.upsert(doc);
+    }
+
+    @Override
+    public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+        final String docId = toDocumentId(key, keySerializer);
+        final BinaryDocument doc = bucket.get(BinaryDocument.create(docId));
+        return deserialize(doc, valueDeserializer);
+    }
+
+    private <V> V deserialize(BinaryDocument doc, Deserializer<V> valueDeserializer) throws IOException {
+        if (doc == null) {
+            return null;
+        }
+        final ByteBuf byteBuf = doc.content();
+        final byte[] bytes = new byte[byteBuf.readableBytes()];
+        byteBuf.readBytes(bytes);
+        byteBuf.release();
+        return valueDeserializer.deserialize(bytes);
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
+        try {
+            bucket.remove(toDocumentId(key, serializer));
+            return true;
+        } catch (DocumentDoesNotExistException e) {
+            return false;
+        }
+    }
+
+    @Override
+    public long removeByPattern(String regex) throws IOException {
+        Statement statement = Delete.deleteFromCurrentBucket().where(regexpContains("meta().id", regex));
+        final N1qlQueryResult result = bucket.query(N1qlQuery.simple(statement));
+        if (logger.isDebugEnabled()) {
+            logger.debug("Deleted documents using regex {}, result={}", regex, result);
+        }
+        return result.info().mutationCount();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseRecordLookupService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseRecordLookupService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseRecordLookupService.java
new file mode 100644
index 0000000..e12eac9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseRecordLookupService.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.couchbase;
+
+import com.couchbase.client.core.CouchbaseException;
+import com.couchbase.client.deps.io.netty.buffer.ByteBufInputStream;
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.document.BinaryDocument;
+import com.couchbase.client.java.document.RawJsonDocument;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.RecordLookupService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.DOCUMENT_TYPE;
+
+@Tags({"lookup", "enrich", "couchbase"})
+@CapabilityDescription("Lookup a record from Couchbase Server associated with the specified key."
+        + " The coordinates that are passed to the lookup must contain the key 'key'.")
+public class CouchbaseRecordLookupService extends AbstractCouchbaseLookupService implements RecordLookupService {
+
+    private volatile RecordReaderFactory readerFactory;
+    private volatile DocumentType documentType;
+
+    private static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use for parsing fetched document from Couchbase Server.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    @Override
+    protected void addProperties(List<PropertyDescriptor> properties) {
+        properties.add(DOCUMENT_TYPE);
+        properties.add(RECORD_READER);
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws InitializationException {
+        super.onEnabled(context);
+        readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
+    }
+
+    @Override
+    public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
+
+        final Bucket bucket = couchbaseClusterService.openBucket(bucketName);
+        final Optional<String> docId = Optional.ofNullable(coordinates.get(KEY)).map(Object::toString);
+
+        final Optional<InputStream> inputStream;
+        try {
+            switch (documentType) {
+
+                case Binary:
+                    inputStream = docId
+                            .map(key -> bucket.get(key, BinaryDocument.class))
+                            .map(doc -> new ByteBufInputStream(doc.content()));
+                    break;
+
+                case Json:
+                    inputStream= docId
+                            .map(key -> bucket.get(key, RawJsonDocument.class))
+                            .map(doc -> new ByteArrayInputStream(doc.content().getBytes(StandardCharsets.UTF_8)));
+                    break;
+
+                default:
+                    return Optional.empty();
+            }
+        } catch (CouchbaseException e) {
+            throw new LookupFailureException("Failed to lookup from Couchbase using this coordinates: " + coordinates);
+        }
+
+        final Optional<Tuple<Exception, RecordReader>> errOrReader = inputStream.map(in -> {
+            try {
+                // Pass coordinates to initiate RecordReader, so that the reader can resolve schema dynamically.
+                // This allow using the same RecordReader service with different schemas if RecordReader is configured to
+                // access schema based on Expression Language.
+                final Map<String, String> recordReaderVariables = new HashMap<>(coordinates.size());
+                coordinates.keySet().forEach(k -> {
+                    final Object value = coordinates.get(k);
+                    if (value != null) {
+                        recordReaderVariables.put(k, value.toString());
+                    }
+                });
+                return new Tuple<>(null, readerFactory.createRecordReader(recordReaderVariables, in, getLogger()));
+            } catch (Exception e) {
+                return new Tuple<>(e, null);
+            }
+        });
+
+        if (!errOrReader.isPresent()) {
+            return Optional.empty();
+        }
+
+        final Exception exception = errOrReader.get().getKey();
+        if (exception != null) {
+            throw new LookupFailureException(String.format("Failed to lookup with %s", coordinates), exception);
+        }
+
+        try {
+            return Optional.ofNullable(errOrReader.get().getValue().nextRecord());
+        } catch (Exception e) {
+            throw new LookupFailureException(String.format("Failed to read Record when looking up with %s", coordinates), e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseUtils.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseUtils.java
new file mode 100644
index 0000000..b4488b8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.couchbase;
+
+import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.document.LegacyDocument;
+
+import java.nio.charset.StandardCharsets;
+
+public class CouchbaseUtils {
+
+    /**
+     * A convenient method to retrieve String value when Document type is unknown.
+     * This method uses LegacyDocument to get, then tries to convert content based on its class.
+     * @param bucket the bucket to get a document
+     * @param id the id of the target document
+     * @return String representation of the stored value, or null if not found
+     */
+    public static String getStringContent(Bucket bucket, String id) {
+        final LegacyDocument doc = bucket.get(LegacyDocument.create(id));
+        if (doc == null) {
+            return null;
+        }
+        final Object content = doc.content();
+        return getStringContent(content);
+    }
+
+    public static String getStringContent(Object content) {
+        if (content instanceof String) {
+            return (String) content;
+        } else if (content instanceof byte[]) {
+            return new String((byte[]) content, StandardCharsets.UTF_8);
+        } else if (content instanceof ByteBuf) {
+            final ByteBuf byteBuf = (ByteBuf) content;
+            byte[] bytes = new byte[byteBuf.readableBytes()];
+            byteBuf.readBytes(bytes);
+            byteBuf.release();
+            return new String(bytes, StandardCharsets.UTF_8);
+        }
+        return content.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
index 4a04c16..3d903d6 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.couchbase.CouchbaseAttributes;
 import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
@@ -39,54 +38,27 @@ import org.apache.nifi.processor.util.StandardValidators;
 import com.couchbase.client.core.CouchbaseException;
 import com.couchbase.client.java.Bucket;
 
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME;
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE;
+
 /**
- * Provides common functionalities for Couchbase processors.
+ * Provides common functionality for Couchbase processors.
  */
 public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
 
-    public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor.Builder().name("Document Type")
-        .description("The type of contents.")
-        .required(true)
-        .allowableValues(DocumentType.values())
-        .defaultValue(DocumentType.Json.toString())
-        .build();
-
-    public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder().name("Document Id")
+    static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder()
+        .name("document-id")
+        .displayName("Document Id")
         .description("A static, fixed Couchbase document id, or an expression to construct the Couchbase document id.")
         .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
 
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-        .name("success")
-        .description("All FlowFiles that are written to Couchbase Server are routed to this relationship.")
-        .build();
-    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
-        .name("original")
-        .description("The original input file will be routed to this destination when it has been successfully processed.")
-        .build();
-    public static final Relationship REL_RETRY = new Relationship.Builder()
-        .name("retry")
-        .description("All FlowFiles that cannot written to Couchbase Server but can be retried are routed to this relationship.")
-        .build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-        .name("failure")
-        .description("All FlowFiles that cannot written to Couchbase Server and can't be retried are routed to this relationship.")
-        .build();
-
-    public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor.Builder().name("Couchbase Cluster Controller Service")
-        .description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.")
-        .required(true)
-        .identifiesControllerService(CouchbaseClusterControllerService.class)
-        .build();
-
-    public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder().name("Bucket Name")
-        .description("The name of bucket to access.")
-        .required(true)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .defaultValue("default")
-        .build();
+    static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").build();
+    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").build();
+    static final Relationship REL_RETRY = new Relationship.Builder().name("retry").build();
+    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").build();
 
     private List<PropertyDescriptor> descriptors;
 
@@ -129,7 +101,11 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
 
     @Override
     public final Set<Relationship> getRelationships() {
-        return this.relationships;
+        return filterRelationships(this.relationships);
+    }
+
+    protected Set<Relationship> filterRelationships(Set<Relationship> rels) {
+        return rels;
     }
 
     @Override
@@ -155,17 +131,17 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
      * @return a bucket instance
      */
     protected final Bucket openBucket(final ProcessContext context) {
-        return getClusterService(context).openBucket(context.getProperty(BUCKET_NAME).getValue());
+        return getClusterService(context).openBucket(context.getProperty(BUCKET_NAME).evaluateAttributeExpressions().getValue());
     }
 
     /**
      * Generate a transit url.
      *
-     * @param context a process context
+     * @param bucket the target bucket
      * @return a transit url based on the bucket name and the CouchbaseClusterControllerService name
      */
-    protected String getTransitUrl(final ProcessContext context, final String docId) {
-        return "couchbase://" + context.getProperty(BUCKET_NAME).getValue() + "/" + docId;
+    protected String getTransitUrl(final Bucket bucket, final String docId) {
+        return "couchbase://" + bucket.name() + "/" + docId;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseAttributes.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseAttributes.java
new file mode 100644
index 0000000..0a979e1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseAttributes.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
+
+/**
+ * Couchbase related attribute keys.
+ */
+public enum CouchbaseAttributes implements FlowFileAttributeKey {
+
+    /**
+     * A reference to the related cluster.
+     */
+    Cluster("couchbase.cluster"),
+    /**
+     * A related bucket name.
+     */
+    Bucket("couchbase.bucket"),
+    /**
+     * The id of a related document.
+     */
+    DocId("couchbase.doc.id"),
+    /**
+     * The CAS value of a related document.
+     */
+    Cas("couchbase.doc.cas"),
+    /**
+     * The expiration of a related document.
+     */
+    Expiry("couchbase.doc.expiry"),
+    /**
+     * The thrown CouchbaseException class.
+     */
+    Exception("couchbase.exception"),
+    ;
+
+    private final String key;
+
+    private CouchbaseAttributes(final String key) {
+        this.key = key;
+    }
+
+    @Override
+    public String key() {
+        return key;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/DocumentType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/DocumentType.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/DocumentType.java
deleted file mode 100644
index 81dd465..0000000
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/DocumentType.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.couchbase;
-
-
-/**
- * Supported Couchbase document types.
- *
- * In order to handle a variety type of document classes such as JsonDocument,
- * JsonLongDocument or JsonStringDocument, Couchbase processors use
- * RawJsonDocument for Json type.
- *
- * The distinction between Json and Binary exists because BinaryDocument doesn't
- * set Json flag when it stored on Couchbase Server even if the content byte
- * array represents a Json string, and it can't be retrieved as a Json document.
- */
-public enum DocumentType {
-
-    Json,
-    Binary
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java
index bae35d5..4c1dd20 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java
@@ -55,20 +55,20 @@ public enum ErrorHandlingStrategy {
 
     private final Result result;
     private final Penalty penalty;
-    private ErrorHandlingStrategy(Result result, Penalty penalty){
+    ErrorHandlingStrategy(Result result, Penalty penalty){
         this.result = result;
         this.penalty = penalty;
     }
 
     public enum Result {
-        ProcessException, Failure, Retry;
+        ProcessException, Failure, Retry
     }
 
     /**
      * Indicating yield or penalize the processing when transfer the input FlowFile.
      */
     public enum Penalty {
-        Yield, Penalize, None;
+        Yield, Penalize, None
     }
 
     public Result result(){

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
index 2692abc..2519e7d 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
@@ -18,16 +18,16 @@ package org.apache.nifi.processors.couchbase;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SystemResource;
@@ -36,8 +36,12 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.couchbase.CouchbaseAttributes;
+import org.apache.nifi.couchbase.CouchbaseUtils;
+import org.apache.nifi.couchbase.DocumentType;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -45,6 +49,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.StreamUtils;
 
 import com.couchbase.client.core.CouchbaseException;
@@ -54,6 +59,10 @@ import com.couchbase.client.java.document.Document;
 import com.couchbase.client.java.document.RawJsonDocument;
 import com.couchbase.client.java.error.DocumentDoesNotExistException;
 
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE;
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.DOCUMENT_TYPE;
+
 @Tags({"nosql", "couchbase", "database", "get"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Get a document from Couchbase Server via Key/Value access. The ID of the document to fetch may be supplied by setting the <Document Id> property. "
@@ -70,18 +79,49 @@ import com.couchbase.client.java.error.DocumentDoesNotExistException;
 @SystemResourceConsideration(resource = SystemResource.MEMORY)
 public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
 
+    public static final PropertyDescriptor PUT_VALUE_TO_ATTRIBUTE = new PropertyDescriptor.Builder()
+        .name("put-to-attribute")
+        .displayName("Put Value to Attribute")
+        .description("If set, the retrieved value will be put into an attribute of the FlowFile instead of a the content of the FlowFile." +
+                " The attribute key to put to is determined by evaluating value of this property.")
+        .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    private volatile boolean putToAttribute = false;
+
     @Override
     protected void addSupportedProperties(final List<PropertyDescriptor> descriptors) {
         descriptors.add(DOCUMENT_TYPE);
         descriptors.add(DOC_ID);
+        descriptors.add(PUT_VALUE_TO_ATTRIBUTE);
     }
 
     @Override
     protected void addSupportedRelationships(final Set<Relationship> relationships) {
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_ORIGINAL);
-        relationships.add(REL_RETRY);
-        relationships.add(REL_FAILURE);
+        relationships.add(new Relationship.Builder().name(REL_ORIGINAL.getName())
+                .description("The original input FlowFile is routed to this relationship" +
+                        " when the value is retrieved from Couchbase Server and routed to 'success'.").build());
+        relationships.add(new Relationship.Builder().name(REL_SUCCESS.getName())
+                .description("Values retrieved from Couchbase Server are written as outgoing FlowFiles content" +
+                        " or put into an attribute of the incoming FlowFile and routed to this relationship.").build());
+        relationships.add(new Relationship.Builder().name(REL_RETRY.getName())
+                .description("All FlowFiles failed to fetch from Couchbase Server but can be retried are routed to this relationship.").build());
+        relationships.add(new Relationship.Builder().name(REL_FAILURE.getName())
+                .description("All FlowFiles failed to fetch from Couchbase Server and not retry-able are routed to this relationship.").build());
+    }
+
+    @Override
+    protected Set<Relationship> filterRelationships(Set<Relationship> rels) {
+        // If destination is attribute, then success == original.
+        return rels.stream().filter(rel -> !REL_ORIGINAL.equals(rel) || !putToAttribute).collect(Collectors.toSet());
+    }
+
+    @Override
+    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+        if (PUT_VALUE_TO_ATTRIBUTE.equals(descriptor)) {
+            putToAttribute = !isEmpty(newValue);
+        }
     }
 
     @Override
@@ -107,72 +147,84 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
             docId = new String(content, StandardCharsets.UTF_8);
         }
 
-        if (StringUtils.isEmpty(docId)) {
+        if (isEmpty(docId)) {
             throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile);
         }
 
+        String putTargetAttr = null;
+        if (context.getProperty(PUT_VALUE_TO_ATTRIBUTE).isSet()) {
+            putTargetAttr = context.getProperty(PUT_VALUE_TO_ATTRIBUTE).evaluateAttributeExpressions(inFile).getValue();
+            if (isEmpty(putTargetAttr)) {
+                inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), "InvalidPutTargetAttributeName");
+                session.transfer(inFile, REL_FAILURE);
+                return;
+            }
+        }
+
         try {
-            final Document<?> doc;
-            final byte[] content;
             final Bucket bucket = openBucket(context);
             final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
+            Document<?> doc = null;
+            // A function to write a document into outgoing FlowFile content.
+            OutputStreamCallback outputStreamCallback = null;
+            final Map<String, String> updatedAttrs = new HashMap<>();
 
             switch (documentType) {
                 case Json: {
                     RawJsonDocument document = bucket.get(docId, RawJsonDocument.class);
-                    if (document == null) {
-                        doc = null;
-                        content = null;
-                    } else {
-                        content = document.content().getBytes(StandardCharsets.UTF_8);
+                    if (document != null) {
+                        outputStreamCallback = out -> {
+                            final byte[] content = document.content().getBytes(StandardCharsets.UTF_8);
+                            out.write(content);
+                            updatedAttrs.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+                        };
                         doc = document;
                     }
                     break;
                 }
                 case Binary: {
                     BinaryDocument document = bucket.get(docId, BinaryDocument.class);
-                    if (document == null) {
-                        doc = null;
-                        content = null;
-                    } else {
-                        content = document.content().array();
+                    if (document != null) {
+                        outputStreamCallback = out -> {
+                            // Write to OutputStream without copying any to heap.
+                            final ByteBuf byteBuf = document.content();
+                            byteBuf.getBytes(byteBuf.readerIndex(), out, byteBuf.readableBytes());
+                            byteBuf.release();
+                        };
                         doc = document;
                     }
                     break;
                 }
-                default: {
-                    doc = null;
-                    content = null;
-                }
             }
 
             if (doc == null) {
-                logger.error("Document {} was not found in {}; routing {} to failure", new Object[] {docId, getTransitUrl(context, docId), inFile});
+                logger.warn("Document {} was not found in {}; routing {} to failure", new Object[] {docId, getTransitUrl(bucket, docId), inFile});
                 inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName());
                 session.transfer(inFile, REL_FAILURE);
                 return;
             }
 
-            FlowFile outFile = session.create(inFile);
-            outFile = session.write(outFile, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream out) throws IOException {
-                    out.write(content);
-                }
-            });
+            FlowFile outFile;
+            if (putToAttribute) {
+                outFile = inFile;
+                updatedAttrs.put(putTargetAttr, CouchbaseUtils.getStringContent(doc.content()));
+            } else {
+                outFile = session.create(inFile);
+                outFile = session.write(outFile, outputStreamCallback);
+                session.transfer(inFile, REL_ORIGINAL);
+            }
 
-            final Map<String, String> updatedAttrs = new HashMap<>();
             updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
-            updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue());
+            updatedAttrs.put(CouchbaseAttributes.Bucket.key(), bucket.name());
             updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
             updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
             updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
             outFile = session.putAllAttributes(outFile, updatedAttrs);
 
             final long fetchMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-            session.getProvenanceReporter().fetch(outFile, getTransitUrl(context, docId), fetchMillis);
+            session.getProvenanceReporter().fetch(outFile, getTransitUrl(bucket, docId), fetchMillis);
             session.transfer(outFile, REL_SUCCESS);
-            session.transfer(inFile, REL_ORIGINAL);
+
         } catch (final CouchbaseException e) {
             String errMsg = String.format("Getting document %s from Couchbase Server using %s failed due to %s", docId, inFile, e);
             handleCouchbaseException(context, session, logger, inFile, e, errMsg);

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
index 6cfe8f5..e8fdbe3 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.document.ByteArrayDocument;
 import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -35,7 +37,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.couchbase.CouchbaseAttributes;
+import org.apache.nifi.couchbase.DocumentType;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
@@ -47,14 +49,14 @@ import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.stream.io.StreamUtils;
 
 import com.couchbase.client.core.CouchbaseException;
-import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
-import com.couchbase.client.deps.io.netty.buffer.Unpooled;
 import com.couchbase.client.java.PersistTo;
 import com.couchbase.client.java.ReplicateTo;
-import com.couchbase.client.java.document.BinaryDocument;
 import com.couchbase.client.java.document.Document;
 import com.couchbase.client.java.document.RawJsonDocument;
 
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE;
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.DOCUMENT_TYPE;
+
 @Tags({"nosql", "couchbase", "database", "put"})
 @CapabilityDescription("Put a document to Couchbase Server via Key/Value access.")
 @InputRequirement(Requirement.INPUT_REQUIRED)
@@ -73,14 +75,18 @@ import com.couchbase.client.java.document.RawJsonDocument;
 public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
 
 
-    public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor.Builder().name("Persist To")
+    public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor.Builder()
+        .name("persist-to")
+        .displayName("Persist To")
         .description("Durability constraint about disk persistence.")
         .required(true)
         .allowableValues(PersistTo.values())
         .defaultValue(PersistTo.NONE.toString())
         .build();
 
-    public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor.Builder().name("Replicate To")
+    public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor.Builder()
+        .name("replicate-to")
+        .displayName("Replicate To")
         .description("Durability constraint about replication.")
         .required(true)
         .allowableValues(ReplicateTo.values())
@@ -97,9 +103,12 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
 
     @Override
     protected void addSupportedRelationships(Set<Relationship> relationships) {
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_RETRY);
-        relationships.add(REL_FAILURE);
+        relationships.add(new Relationship.Builder().name(REL_SUCCESS.getName())
+                .description("All FlowFiles that are written to Couchbase Server are routed to this relationship.").build());
+        relationships.add(new Relationship.Builder().name(REL_RETRY.getName())
+                .description("All FlowFiles failed to be written to Couchbase Server but can be retried are routed to this relationship.").build());
+        relationships.add(new Relationship.Builder().name(REL_FAILURE.getName())
+                .description("All FlowFiles failed to be written to Couchbase Server and not retry-able are routed to this relationship.").build());
     }
 
     @Override
@@ -132,25 +141,25 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
                     break;
                 }
                 case Binary: {
-                    final ByteBuf buf = Unpooled.copiedBuffer(content);
-                    doc = BinaryDocument.create(docId, buf);
+                    doc = ByteArrayDocument.create(docId, content);
                     break;
                 }
             }
 
             final PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue());
             final ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue());
-            doc = openBucket(context).upsert(doc, persistTo, replicateTo);
+            final Bucket bucket = openBucket(context);
+            doc = bucket.upsert(doc, persistTo, replicateTo);
 
             final Map<String, String> updatedAttrs = new HashMap<>();
             updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
-            updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue());
+            updatedAttrs.put(CouchbaseAttributes.Bucket.key(), bucket.name());
             updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
             updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
             updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
 
             flowFile = session.putAllAttributes(flowFile, updatedAttrs);
-            session.getProvenanceReporter().send(flowFile, getTransitUrl(context, docId));
+            session.getProvenanceReporter().send(flowFile, getTransitUrl(bucket, docId));
             session.transfer(flowFile, REL_SUCCESS);
         } catch (final CouchbaseException e) {
             String errMsg = String.format("Writing document %s to Couchbase Server using %s failed due to %s", docId, flowFile, e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index e5e3ea7..f33bf6c 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -12,4 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.couchbase.CouchbaseClusterService
\ No newline at end of file
+org.apache.nifi.couchbase.CouchbaseClusterService
+org.apache.nifi.couchbase.CouchbaseMapCacheClient
+org.apache.nifi.couchbase.CouchbaseKeyValueLookupService
+org.apache.nifi.couchbase.CouchbaseRecordLookupService

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/docs/org.apache.nifi.couchbase.CouchbaseMapCacheClient/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/docs/org.apache.nifi.couchbase.CouchbaseMapCacheClient/additionalDetails.html b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/docs/org.apache.nifi.couchbase.CouchbaseMapCacheClient/additionalDetails.html
new file mode 100644
index 0000000..d8c5011
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/docs/org.apache.nifi.couchbase.CouchbaseMapCacheClient/additionalDetails.html
@@ -0,0 +1,41 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>CouchbaseMapCacheClient</title>
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>CouchbaseMapCacheClient</h2>
+
+<h3>Requirements</h3>
+
+<h4>Couchbase Server 4.0 or higher is required for some operation using N1QL</h4>
+
+Following cache operations require N1QL query, thus you need to deploy Couchbase Server 4.0 or higher for those operations. However, as of this writing (May 2017) there are only few processors using these operations. Most cache APIs are implemented using document id lookup and should work with older version of Couchbase Server.
+
+<ul>
+    <li>removeByPattern(String regex): This cache API removes entries by regex. Execute query like:
+        <pre>delete from `cache-bucket-name` where REGEX_CONTAINS(meta().id, "^key.*")</pre>
+    </li>
+</ul>
+
+In order to make N1QL work correctly you need to create a <a href="https://developer.couchbase.com/documentation/server/current/n1ql/n1ql-language-reference/createprimaryindex.html">Primary index</a> or an index covering N1QL queries performed by CouchbaseMapCacheClient. Please refer Couchbase Server documentations for how to create those.
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseClusterService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseClusterService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseClusterService.java
new file mode 100644
index 0000000..834607a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseClusterService.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.couchbase;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestCouchbaseClusterService {
+
+    private static final String SERVICE_ID = "couchbaseClusterService";
+    private TestRunner testRunner;
+
+    public static class SampleProcessor extends AbstractProcessor {
+        @Override
+        public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+        }
+    }
+
+    @Before
+    public void init() throws Exception {
+        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.PutCouchbaseKey", "debug");
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.couchbase.CouchbaseClusterService", "debug");
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.couchbase.TestCouchbaseClusterService", "debug");
+
+        testRunner = TestRunners.newTestRunner(SampleProcessor.class);
+        testRunner.setValidateExpressionUsage(false);
+    }
+
+    @Test
+    public void testConnectionFailure() throws InitializationException {
+        String connectionString = "invalid-protocol://invalid-hostname";
+        CouchbaseClusterControllerService service = new CouchbaseClusterService();
+        testRunner.addControllerService(SERVICE_ID, service);
+        testRunner.setProperty(service, CouchbaseClusterService.CONNECTION_STRING, connectionString);
+        try {
+            testRunner.enableControllerService(service);
+            Assert.fail("The service shouldn't be enabled when it couldn't connect to a cluster.");
+        } catch (AssertionError e) {
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseMapCacheClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseMapCacheClient.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseMapCacheClient.java
new file mode 100644
index 0000000..b442e40
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseMapCacheClient.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.couchbase;
+
+import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
+import com.couchbase.client.deps.io.netty.buffer.Unpooled;
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.document.BinaryDocument;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME;
+import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCouchbaseMapCacheClient {
+
+    private final Serializer<String> stringSerializer = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8));
+    private final Deserializer<String> stringDeserializer = input -> new String(input, StandardCharsets.UTF_8);
+
+    // TODO: Add more tests
+
+    @Test
+    public void testGet() throws Exception {
+        final CouchbaseMapCacheClient client = new CouchbaseMapCacheClient();
+        final CouchbaseClusterControllerService couchbaseService = mock(CouchbaseClusterControllerService.class);
+        final Bucket bucket = mock(Bucket.class);
+
+        final MockControllerServiceInitializationContext serviceInitializationContext
+                = new MockControllerServiceInitializationContext(couchbaseService, "couchbaseService");
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(COUCHBASE_CLUSTER_SERVICE, "couchbaseService");
+        properties.put(BUCKET_NAME, "bucketA");
+
+        final ByteBuf contents = Unpooled.copiedBuffer("value".getBytes(StandardCharsets.UTF_8));
+        final BinaryDocument doc = BinaryDocument.create("key", contents);
+        when(couchbaseService.openBucket(eq("bucketA"))).thenReturn(bucket);
+        when(bucket.get(any(BinaryDocument.class))).thenReturn(doc);
+
+        final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext);
+        client.configure(context);
+        final String cacheEntry = client.get("key", stringSerializer, stringDeserializer);
+
+        assertEquals("value", cacheEntry);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseUtils.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseUtils.java
new file mode 100644
index 0000000..f110ad4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/couchbase/TestCouchbaseUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.couchbase;
+
+import com.couchbase.client.deps.io.netty.buffer.Unpooled;
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.CouchbaseCluster;
+import com.couchbase.client.java.document.BinaryDocument;
+import com.couchbase.client.java.document.ByteArrayDocument;
+import com.couchbase.client.java.document.JsonArrayDocument;
+import com.couchbase.client.java.document.JsonBooleanDocument;
+import com.couchbase.client.java.document.JsonDocument;
+import com.couchbase.client.java.document.JsonDoubleDocument;
+import com.couchbase.client.java.document.JsonLongDocument;
+import com.couchbase.client.java.document.JsonStringDocument;
+import com.couchbase.client.java.document.LegacyDocument;
+import com.couchbase.client.java.document.RawJsonDocument;
+import com.couchbase.client.java.document.StringDocument;
+import com.couchbase.client.java.document.json.JsonArray;
+import com.couchbase.client.java.document.json.JsonObject;
+import com.couchbase.client.java.error.TranscodingException;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestCouchbaseUtils {
+
+    @Ignore("This test method requires a live Couchbase Server instance")
+    @Test
+    public void testDocumentTypesAndStringConversion() {
+        final CouchbaseCluster cluster = CouchbaseCluster.fromConnectionString("couchbase://192.168.99.100:8091");
+        final Bucket bucket = cluster.openBucket("b1", "b1password");
+
+        bucket.upsert(JsonDocument.create("JsonDocument", JsonObject.create().put("one", 1)));
+        bucket.upsert(JsonArrayDocument.create("JsonArray", JsonArray.create().add(1).add(2).add(3)));
+        bucket.upsert(JsonDoubleDocument.create("JsonDouble", 0.123));
+        bucket.upsert(JsonStringDocument.create("JsonString", "value"));
+        bucket.upsert(JsonBooleanDocument.create("JsonBoolean", true));
+        bucket.upsert(JsonLongDocument.create("JsonLong", 123L));
+
+        bucket.upsert(RawJsonDocument.create("RawJsonDocument", "value"));
+        bucket.upsert(StringDocument.create("StringDocument", "value"));
+
+        bucket.upsert(BinaryDocument.create("BinaryDocument", Unpooled.copiedBuffer("value".getBytes(StandardCharsets.UTF_8))));
+        bucket.upsert(ByteArrayDocument.create("ByteArrayDocument", "value".getBytes(StandardCharsets.UTF_8)));
+
+        final String[][] expectations = {
+                {"JsonDocument", "String", "{\"one\":1}"},
+                {"JsonArray", "String", "[1,2,3]"},
+                {"JsonDouble", "String", "0.123"},
+                {"JsonString", "String", "\"value\""},
+                {"JsonBoolean", "String", "true"},
+                {"JsonLong", "String", "123"},
+                {"RawJsonDocument", "String", "value"},
+                {"StringDocument", "String", "value"},
+                {"BinaryDocument", "byte[]", "value"},
+                {"ByteArrayDocument", "byte[]", "value"},
+        };
+
+        for (String[] expectation : expectations) {
+            final LegacyDocument document = bucket.get(LegacyDocument.create(expectation[0]));
+            assertEquals(expectation[1], document.content().getClass().getSimpleName());
+            assertEquals(expectation[2], CouchbaseUtils.getStringContent(document.content()));
+        }
+
+        final BinaryDocument binaryDocument = bucket.get(BinaryDocument.create("BinaryDocument"));
+        final String stringFromByteBuff = CouchbaseUtils.getStringContent(binaryDocument.content());
+        assertEquals("value", stringFromByteBuff);
+
+        try {
+            bucket.get(BinaryDocument.create("JsonDocument"));
+            fail("Getting a JSON document as a BinaryDocument fails");
+        } catch (TranscodingException e) {
+            assertTrue(e.getMessage().contains("Flags (0x2000000) indicate non-binary document for id JsonDocument"));
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/52d6b9cf/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java
deleted file mode 100644
index 703bc5f..0000000
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.couchbase;
-
-import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
-import org.apache.nifi.couchbase.CouchbaseClusterService;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class TestCouchbaseClusterService {
-
-    private static final String SERVICE_ID = "couchbaseClusterService";
-    private TestRunner testRunner;
-
-    @Before
-    public void init() throws Exception {
-        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
-        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
-        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.PutCouchbaseKey", "debug");
-        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.couchbase.CouchbaseClusterService", "debug");
-        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.couchbase.TestCouchbaseClusterService", "debug");
-
-        testRunner = TestRunners.newTestRunner(PutCouchbaseKey.class);
-    }
-
-    @Test
-    public void testConnectionFailure() throws InitializationException {
-        String connectionString = "invalid-protocol://invalid-hostname";
-        CouchbaseClusterControllerService service = new CouchbaseClusterService();
-        testRunner.addControllerService(SERVICE_ID, service);
-        testRunner.setProperty(service, CouchbaseClusterService.CONNECTION_STRING, connectionString);
-        try {
-            testRunner.enableControllerService(service);
-            Assert.fail("The service shouldn't be enabled when it couldn't connect to a cluster.");
-        } catch (AssertionError e) {
-        }
-    }
-
-}