You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2015/10/01 21:43:14 UTC

[2/4] nifi git commit: nifi-992 Improvements based on code review.

nifi-992 Improvements based on code review.

- Removed checkstyle and contrib-check profile since it's inherit from
  top-level pom.
- Consolidate DOC_ID and DOC_ID_EXP into a single DOC_ID property.
- Add capability description on GetCouchbaseKey.
- Fixed documentation spell misses.
- Handle Exceptions accordingly.
- Add 'retry' relationship.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/72eb64e8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/72eb64e8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/72eb64e8

Branch: refs/heads/master
Commit: 72eb64e8a43a08a5be988a3826b0a116c57915ea
Parents: 2466a24
Author: ijokarumawak <ij...@gmail.com>
Authored: Wed Sep 30 00:58:39 2015 +0900
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Sep 29 13:48:13 2015 -0400

----------------------------------------------------------------------
 .../nifi-couchbase-processors/pom.xml           | 144 ----------
 .../nifi/couchbase/CouchbaseAttributes.java     |   4 +
 .../couchbase/AbstractCouchbaseProcessor.java   |  94 ++++---
 .../couchbase/CouchbaseExceptionMappings.java   | 128 +++++++++
 .../couchbase/ErrorHandlingStrategy.java        |  59 ++++
 .../processors/couchbase/GetCouchbaseKey.java   |  45 ++-
 .../processors/couchbase/PutCouchbaseKey.java   |  45 +--
 .../couchbase/TestCouchbaseClusterService.java  |   2 +-
 .../couchbase/TestGetCouchbaseKey.java          | 282 +++++++++++++++++--
 .../couchbase/TestPutCouchbaseKey.java          |  95 +++++--
 10 files changed, 626 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml
index 33b0baa..257ef46 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml
@@ -61,148 +61,4 @@
             <scope>test</scope>
 		</dependency>
     </dependencies>
-	<build>
-		<pluginManagement>
-			<plugins>
-				<plugin>
-					<groupId>org.apache.maven.plugins</groupId>
-					<artifactId>maven-checkstyle-plugin</artifactId>
-					<version>2.15</version>
-					<dependencies>
-						<dependency>
-							<groupId>com.puppycrawl.tools</groupId>
-							<artifactId>checkstyle</artifactId>
-							<version>6.5</version>
-						</dependency>
-					</dependencies>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-		<plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-checkstyle-plugin</artifactId>
-                <configuration>
-                    <checkstyleRules>
-                        <module name="Checker">
-                            <property name="charset" value="UTF-8" />
-                            <property name="severity" value="warning" />
-                            <!-- Checks for whitespace -->
-                            <!-- See http://checkstyle.sf.net/config_whitespace.html -->
-                            <module name="FileTabCharacter">
-                                <property name="eachLine" value="true" />
-                            </module>
-                            <module name="TreeWalker">
-                                <module name="RegexpSinglelineJava">
-                                    <property name="format" value="\s+$" />
-                                    <property name="message" value="Line has trailing whitespace." />
-                                </module>
-                                <module name="RegexpSinglelineJava">
-                                    <property name="format" value="[@]see\s+[{][@]link" />
-                                    <property name="message" value="Javadoc @see does not need @link: pick one or the other." />
-                                </module>
-                                <module name="OuterTypeFilename" />
-                                <module name="LineLength">
-                                    <!-- needs extra, because Eclipse formatter ignores the ending left
-                                        brace -->
-                                    <property name="max" value="200" />
-                                    <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://" />
-                                </module>
-                                <module name="AvoidStarImport" />
-                                <module name="UnusedImports">
-                                    <property name="processJavadoc" value="true" />
-                                </module>
-                                 <module name="NoLineWrap" />
-                                <module name="LeftCurly">
-                                    <property name="maxLineLength" value="160" />
-                                </module>
-                                <module name="RightCurly" />
-                                <module name="RightCurly">
-                                    <property name="option" value="alone" />
-                                    <property name="tokens" value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, INSTANCE_INIT" />
-                                </module>
-                                <module name="SeparatorWrap">
-                                    <property name="tokens" value="DOT" />
-                                    <property name="option" value="nl" />
-                                </module>
-                                <module name="SeparatorWrap">
-                                    <property name="tokens" value="COMMA" />
-                                    <property name="option" value="EOL" />
-                                </module>
-                                <module name="PackageName">
-                                    <property name="format" value="^[a-z]+(\.[a-z][a-zA-Z0-9]*)*$" />
-                                </module>
-                                <module name="MethodTypeParameterName">
-                                    <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)" />
-                                </module>
-                                <module name="MethodParamPad" />
-                                <module name="OperatorWrap">
-                                    <property name="option" value="NL" />
-                                    <property name="tokens" value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, QUESTION, SL, SR, STAR " />
-                                </module>
-                                 <module name="AnnotationLocation">
-                                    <property name="tokens" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF" />
-                                </module>
-                                <module name="AnnotationLocation">
-                                    <property name="tokens" value="VARIABLE_DEF" />
-                                    <property name="allowSamelineMultipleAnnotations" value="true" />
-                                </module>
-                                <module name="NonEmptyAtclauseDescription" />
-                                <module name="JavadocMethod">
-                                    <property name="allowMissingJavadoc" value="true" />
-                                    <property name="allowMissingParamTags" value="true" />
-                                    <property name="allowMissingThrowsTags" value="true" />
-                                    <property name="allowMissingReturnTag" value="true" />
-                                    <property name="allowedAnnotations" value="Override,Test,BeforeClass,AfterClass,Before,After" />
-                                    <property name="allowThrowsTagsForSubclasses" value="true" />
-                                </module>
-                                <module name="SingleLineJavadoc" />
-                             </module>
-                        </module>
-                    </checkstyleRules>
-                    <violationSeverity>warning</violationSeverity>
-                    <includeTestSourceDirectory>true</includeTestSourceDirectory>
-                </configuration>
-	 		</plugin>
- 		</plugins>
-	</build>
-    <profiles>
-        <profile>
-            <!-- Checks style and licensing requirements. This is a good idea to run
-                for contributions and for the release process. While it would be nice to
-                run always these plugins can considerably slow the build and have proven
-                to create unstable builds in our multi-module project and when building using
-                multiple threads. The stability issues seen with Checkstyle in multi-module
-                builds include false-positives and false negatives. -->
-            <id>contrib-check</id>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>org.apache.rat</groupId>
-                        <artifactId>apache-rat-plugin</artifactId>
-                        <executions>
-                            <execution>
-                                <goals>
-                                    <goal>check</goal>
-                                </goals>
-                                <phase>verify</phase>
-                            </execution>
-                        </executions>
-                    </plugin>
-                    <plugin>
-                        <groupId>org.apache.maven.plugins</groupId>
-                        <artifactId>maven-checkstyle-plugin</artifactId>
-                        <executions>
-                            <execution>
-                                <id>check-style</id>
-                                <goals>
-                                    <goal>check</goal>
-                                </goals>
-                            </execution>
-                        </executions>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-    </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java
index a4d69fc..3bef8c5 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java
@@ -43,6 +43,10 @@ public enum CouchbaseAttributes implements FlowFileAttributeKey {
      * The expiration of a related document.
      */
     Expiry("couchbase.doc.expiry"),
+    /**
+     * The thrown CouchbaseException class.
+     */
+    Exception("couchbase.exception"),
     ;
 
     private final String key;

http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
index d370728..066b1ca 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
@@ -23,13 +23,19 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.couchbase.CouchbaseAttributes;
 import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
+import com.couchbase.client.core.CouchbaseException;
 import com.couchbase.client.java.Bucket;
 
 /**
@@ -46,49 +52,45 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
             .build();
 
     public static final PropertyDescriptor DOC_ID = new PropertyDescriptor
-            .Builder().name("Static Document Id")
-            .description("A static, fixed Couchbase document id.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor DOC_ID_EXP = new PropertyDescriptor
-            .Builder().name("Document Id Expression")
-            .description("An expression to construct the Couchbase document id."
-                    + " If 'Static Document Id' is specified, then 'Static Document Id' is used.")
-            .required(false)
+            .Builder().name("Document Id")
+            .description("A static, fixed Couchbase document id."
+                    + "Or an expression to construct the Couchbase document id.")
             .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("All FlowFiles that are written to Couchbase Server are routed to this relationship.")
-            .build();
+        .name("success")
+        .description("All FlowFiles that are written to Couchbase Server are routed to this relationship.")
+        .build();
     public static final Relationship REL_ORIGINAL = new Relationship.Builder()
-            .name("original")
-            .description("The original input file will be routed to this destination when it has been successfully processed.")
-            .build();
+        .name("original")
+        .description("The original input file will be routed to this destination when it has been successfully processed.")
+        .build();
+    public static final Relationship REL_RETRY = new Relationship.Builder()
+        .name("retry")
+        .description("All FlowFiles that cannot written to Couchbase Server but can be retried are routed to this relationship.")
+        .build();
     public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("All FlowFiles that cannot written to Couchbase Server are routed to this relationship.")
-            .build();
+        .name("failure")
+        .description("All FlowFiles that cannot written to Couchbase Server and can't be retried are routed to this relationship.")
+        .build();
 
     public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor
-            .Builder().name("Couchbase Cluster Controller Service")
-            .description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.")
-            .required(true)
-            .identifiesControllerService(CouchbaseClusterControllerService.class)
-            .build();
+        .Builder().name("Couchbase Cluster Controller Service")
+        .description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.")
+        .required(true)
+        .identifiesControllerService(CouchbaseClusterControllerService.class)
+        .build();
 
     public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor
-            .Builder().name("Bucket Name")
-            .description("The name of bucket to access.")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .defaultValue("default")
-            .build();
+        .Builder().name("Bucket Name")
+        .description("The name of bucket to access.")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .defaultValue("default")
+        .build();
 
     private List<PropertyDescriptor> descriptors;
 
@@ -171,4 +173,32 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
             .toString();
     }
 
+    /**
+     * Handles the thrown CocuhbaseException accordingly.
+     * @param session a process session
+     * @param logger a logger
+     * @param inFile an input FlowFile
+     * @param e the thrown CouchbaseException
+     * @param errMsg a message to be logged
+     */
+    protected void handleCouchbaseException(final ProcessSession session,
+            final ProcessorLog logger, FlowFile inFile, CouchbaseException e,
+            String errMsg) {
+        logger.error(errMsg, e);
+        if(inFile != null){
+            ErrorHandlingStrategy strategy = CouchbaseExceptionMappings.getStrategy(e);
+            switch(strategy.result()) {
+            case ProcessException:
+                throw new ProcessException(errMsg, e);
+            case Failure:
+                inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
+                session.transfer(inFile, REL_FAILURE);
+                break;
+            case Retry:
+                inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
+                session.transfer(inFile, REL_RETRY);
+                break;
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java
new file mode 100644
index 0000000..87ffabb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.ConfigurationError;
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Fatal;
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.InvalidInput;
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.TemporalClusterError;
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.TemporalFlowFileError;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.couchbase.client.core.BackpressureException;
+import com.couchbase.client.core.BucketClosedException;
+import com.couchbase.client.core.CouchbaseException;
+import com.couchbase.client.core.DocumentConcurrentlyModifiedException;
+import com.couchbase.client.core.DocumentMutationLostException;
+import com.couchbase.client.core.ReplicaNotConfiguredException;
+import com.couchbase.client.core.RequestCancelledException;
+import com.couchbase.client.core.ServiceNotAvailableException;
+import com.couchbase.client.core.config.ConfigurationException;
+import com.couchbase.client.core.endpoint.SSLException;
+import com.couchbase.client.core.endpoint.kv.AuthenticationException;
+import com.couchbase.client.core.env.EnvironmentException;
+import com.couchbase.client.core.state.NotConnectedException;
+import com.couchbase.client.java.error.BucketDoesNotExistException;
+import com.couchbase.client.java.error.CannotRetryException;
+import com.couchbase.client.java.error.CouchbaseOutOfMemoryException;
+import com.couchbase.client.java.error.DurabilityException;
+import com.couchbase.client.java.error.InvalidPasswordException;
+import com.couchbase.client.java.error.RequestTooBigException;
+import com.couchbase.client.java.error.TemporaryFailureException;
+import com.couchbase.client.java.error.TranscodingException;
+
+public class CouchbaseExceptionMappings {
+
+    private static final Map<Class<? extends CouchbaseException>, ErrorHandlingStrategy>mapping = new HashMap<>();
+
+    /*
+     * - Won't happen
+     * BucketAlreadyExistsException: never create a bucket
+     * CASMismatchException: cas-id and replace is not used yet
+     * DesignDocumentException: View is not used yet
+     * DocumentAlreadyExistsException: insert is not used yet
+     * DocumentDoesNotExistException: replace is not used yet
+     * FlushDisabledException: never call flush
+     * RepositoryMappingException: EntityDocument is not used
+     * TemporaryLockFailureException: we don't obtain locks
+     * ViewDoesNotExistException: View is not used yet
+     * NamedPreparedStatementException: N1QL is not used yet
+     * QueryExecutionException: N1QL is not used yet
+     */
+    static {
+        /*
+         * ConfigurationError
+         */
+        mapping.put(AuthenticationException.class, ConfigurationError);
+        mapping.put(BucketDoesNotExistException.class, ConfigurationError);
+        mapping.put(ConfigurationException.class, ConfigurationError);
+        mapping.put(InvalidPasswordException.class, ConfigurationError);
+        mapping.put(EnvironmentException.class, ConfigurationError);
+        // when Couchbase doesn't have enough replica
+        mapping.put(ReplicaNotConfiguredException.class, ConfigurationError);
+        // when a particular Service(KV, View, Query, DCP) isn't running in a cluster
+        mapping.put(ServiceNotAvailableException.class, ConfigurationError);
+        // SSL configuration error, such as key store mis configuration.
+        mapping.put(SSLException.class, ConfigurationError);
+
+        /*
+         * InvalidInput
+         */
+        mapping.put(RequestTooBigException.class, InvalidInput);
+        mapping.put(TranscodingException.class, InvalidInput);
+
+        /*
+         * Temporal Cluster Error
+         */
+        mapping.put(BackpressureException.class, TemporalClusterError);
+        mapping.put(CouchbaseOutOfMemoryException.class, TemporalClusterError);
+        mapping.put(TemporaryFailureException.class, TemporalClusterError);
+        // occurs when a connection gets lost
+        mapping.put(RequestCancelledException.class, TemporalClusterError);
+
+        /*
+         * Temporal FlowFile Error
+         */
+        mapping.put(DocumentConcurrentlyModifiedException.class, TemporalFlowFileError);
+        mapping.put(DocumentMutationLostException.class, TemporalFlowFileError);
+        mapping.put(DurabilityException.class, TemporalFlowFileError);
+
+        /*
+         * Fatal
+         */
+        mapping.put(BucketClosedException.class, Fatal);
+        mapping.put(CannotRetryException.class, Fatal);
+        mapping.put(NotConnectedException.class, Fatal);
+    }
+
+    /**
+     * Returns a registered error handling strategy.
+     * @param e the CouchbaseException
+     * @return a registered strategy, if it's not registered, then return Fatal
+     */
+    public static ErrorHandlingStrategy getStrategy(CouchbaseException e){
+        ErrorHandlingStrategy strategy = mapping.get(e.getClass());
+        if(strategy == null) {
+            // Treat unknown Exception as Fatal.
+            return ErrorHandlingStrategy.Fatal;
+        }
+        return strategy;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java
new file mode 100644
index 0000000..75b8f46
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.Penalize;
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.Yield;
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.Failure;
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.ProcessException;
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.Retry;
+
+
+public enum ErrorHandlingStrategy {
+
+    ConfigurationError(ProcessException, Yield),
+    InvalidInput(Failure, Penalize),
+    TemporalClusterError(Retry, Yield),
+    TemporalFlowFileError(Retry, Penalize),
+    Fatal(Failure, Yield);
+
+    private final Result result;
+    private final Penalty penalty;
+    private ErrorHandlingStrategy(Result result, Penalty penalty){
+        this.result = result;
+        this.penalty = penalty;
+    }
+
+    public enum Result {
+        ProcessException, Failure, Retry;
+    }
+
+    /**
+     * Indicating yield or penalize the processing when transfer the input FlowFile.
+     */
+    public enum Penalty {
+        Yield, Penalize;
+    }
+
+    public Result result(){
+        return this.result;
+    }
+
+    public Penalty penalty(){
+        return this.penalty;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
index 6d9a476..8c15e29 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
@@ -45,24 +45,27 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.stream.io.StreamUtils;
 
+import com.couchbase.client.core.CouchbaseException;
 import com.couchbase.client.java.Bucket;
 import com.couchbase.client.java.document.BinaryDocument;
 import com.couchbase.client.java.document.Document;
 import com.couchbase.client.java.document.RawJsonDocument;
+import com.couchbase.client.java.error.DocumentDoesNotExistException;
 
 @Tags({ "nosql", "couchbase", "database", "get" })
-@CapabilityDescription("Get a document from Couchbase Server via Key/Value access.")
+@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. This processor can be triggered by an incoming FlowFile, or it can be scheduled on a timer")
 @SeeAlso({CouchbaseClusterControllerService.class})
 @ReadsAttributes({
-    @ReadsAttribute(attribute = "FlowFile content", description = "Used as a document id if none of 'Static Document Id' or 'Document Id Expression' is specified"),
-    @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id Excepression.")
+    @ReadsAttribute(attribute = "FlowFile content", description = "Used as a document id if 'Document Id' is not specified"),
+    @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.")
     })
 @WritesAttributes({
     @WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was retrieved from."),
     @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was retrieved from."),
     @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."),
     @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."),
-    @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document.")
+    @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."),
+    @WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.")
     })
 public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
 
@@ -70,13 +73,13 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
     protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
         descriptors.add(DOCUMENT_TYPE);
         descriptors.add(DOC_ID);
-        descriptors.add(DOC_ID_EXP);
     }
 
     @Override
     protected void addSupportedRelationships(Set<Relationship> relationships) {
         relationships.add(REL_SUCCESS);
         relationships.add(REL_ORIGINAL);
+        relationships.add(REL_RETRY);
         relationships.add(REL_FAILURE);
     }
 
@@ -86,15 +89,9 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
         FlowFile inFile = session.get();
 
         String docId = null;
-        if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
-            docId = context.getProperty(DOC_ID).getValue();
-        } else {
-            // Otherwise docId has to be extracted from inFile.
-            if ( inFile == null ) {
-                return;
-            }
-            if(!StringUtils.isEmpty(context.getProperty(DOC_ID_EXP).getValue())){
-                docId = context.getProperty(DOC_ID_EXP).evaluateAttributeExpressions(inFile).getValue();
+        try {
+            if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
+                docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue();
             } else {
                 final byte[] content = new byte[(int) inFile.getSize()];
                 session.read(inFile, new InputStreamCallback() {
@@ -105,11 +102,14 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
                 });
                 docId = new String(content, StandardCharsets.UTF_8);
             }
+        } catch (Throwable t) {
+            throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile);
         }
 
         if(StringUtils.isEmpty(docId)){
-            logger.error("Couldn't get document id from from {}", new Object[]{inFile});
-            session.transfer(inFile, REL_FAILURE);
+            if(inFile != null){
+                throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile);
+            }
         }
 
         try {
@@ -137,8 +137,9 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
             }
 
             if(doc == null) {
-                logger.info("Document {} was not found in {}", new Object[]{docId, getTransitUrl(context)});
+                logger.warn("Document {} was not found in {}", new Object[]{docId, getTransitUrl(context)});
                 if(inFile != null){
+                    inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName());
                     session.transfer(inFile, REL_FAILURE);
                 }
                 return;
@@ -160,13 +161,11 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
             session.getProvenanceReporter().receive(outFile, getTransitUrl(context));
             session.transfer(outFile, REL_SUCCESS);
 
-        } catch (Throwable t){
-            logger.error("Getting docuement {} from Couchbase Server using {} failed due to {}",
-                    new Object[]{docId, inFile, t}, t);
-            if(inFile != null){
-                session.transfer(inFile, REL_FAILURE);
-            }
+        } catch (CouchbaseException e){
+            String errMsg = String.format("Getting docuement %s from Couchbase Server using %s failed due to %s", docId, inFile, e);
+            handleCouchbaseException(session, logger, inFile, e, errMsg);
         }
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
index 6bfa480..8f41383 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
@@ -45,6 +45,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.stream.io.StreamUtils;
 
+import com.couchbase.client.core.CouchbaseException;
 import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
 import com.couchbase.client.deps.io.netty.buffer.Unpooled;
 import com.couchbase.client.java.PersistTo;
@@ -57,15 +58,16 @@ import com.couchbase.client.java.document.RawJsonDocument;
 @CapabilityDescription("Put a document to Couchbase Server via Key/Value access.")
 @SeeAlso({CouchbaseClusterControllerService.class})
 @ReadsAttributes({
-    @ReadsAttribute(attribute = "uuid", description = "Used as a document id if none of 'Static Document Id' or 'Document Id Expression' is specified"),
-    @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id Excepression.")
+    @ReadsAttribute(attribute = "uuid", description = "Used as a document id if 'Document Id' is not specified"),
+    @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.")
     })
 @WritesAttributes({
     @WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was stored."),
     @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was stored."),
     @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."),
     @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."),
-    @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document.")
+    @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."),
+    @WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.")
     })
 public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
 
@@ -90,7 +92,6 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
     protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
         descriptors.add(DOCUMENT_TYPE);
         descriptors.add(DOC_ID);
-        descriptors.add(DOC_ID_EXP);
         descriptors.add(PERSIST_TO);
         descriptors.add(REPLICATE_TO);
     }
@@ -109,24 +110,25 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
             return;
         }
 
-        try {
-
-            final byte[] content = new byte[(int) flowFile.getSize()];
-            session.read(flowFile, new InputStreamCallback() {
-                @Override
-                public void process(final InputStream in) throws IOException {
-                    StreamUtils.fillBuffer(in, content, true);
-                }
-            });
-
+        String docId = null;
+        final byte[] content = new byte[(int) flowFile.getSize()];
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(final InputStream in) throws IOException {
+                StreamUtils.fillBuffer(in, content, true);
+            }
+        });
 
-            String docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key()));
+        try {
+            docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key()));
             if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
-                docId = context.getProperty(DOC_ID).getValue();
-            } else if(!StringUtils.isEmpty(context.getProperty(DOC_ID_EXP).getValue())){
-                docId = context.getProperty(DOC_ID_EXP).evaluateAttributeExpressions(flowFile).getValue();
+                docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
             }
+        } catch (Throwable t) {
+            throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + flowFile);
+        }
 
+        try {
             Document<?> doc = null;
             DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
             switch (documentType){
@@ -141,7 +143,6 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
                 }
             }
 
-
             PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue());
             ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue());
             doc = openBucket(context).upsert(doc, persistTo, replicateTo);
@@ -155,9 +156,9 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
             session.getProvenanceReporter().send(flowFile, getTransitUrl(context));
             session.transfer(flowFile, REL_SUCCESS);
 
-        } catch (Throwable t) {
-            logger.error("Writing {} into Couchbase Server failed due to {}", new Object[]{flowFile, t}, t);
-            session.transfer(flowFile, REL_FAILURE);
+        } catch (CouchbaseException e) {
+            String errMsg = String.format("Writing docuement %s to Couchbase Server using %s failed due to %s", docId, flowFile, e);
+            handleCouchbaseException(session, logger, flowFile, e, errMsg);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java
index d96b1c2..eb2220d 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java
@@ -45,7 +45,7 @@ public class TestCouchbaseClusterService {
 
     @Test
     public void testConnectionFailure() throws InitializationException {
-        String connectionString = "couchbase://invalid-hostname";
+        String connectionString = "invalid-protocol://invalid-hostname";
         CouchbaseClusterControllerService service = new CouchbaseClusterService();
         testRunner.addControllerService(SERVICE_ID, service);
         testRunner.setProperty(service, CouchbaseClusterService.CONNECTION_STRING, connectionString);

http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
index 4ea4dff..dca2ae3 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
@@ -16,14 +16,16 @@
  */
 package org.apache.nifi.processors.couchbase;
 
+import static org.apache.nifi.couchbase.CouchbaseAttributes.Exception;
 import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.BUCKET_NAME;
 import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.COUCHBASE_CLUSTER_SERVICE;
 import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOCUMENT_TYPE;
 import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID;
-import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID_EXP;
 import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE;
 import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_ORIGINAL;
+import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_RETRY;
 import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -34,19 +36,28 @@ import java.util.Map;
 
 import org.apache.nifi.couchbase.CouchbaseAttributes;
 import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.couchbase.client.core.BackpressureException;
+import com.couchbase.client.core.CouchbaseException;
 import com.couchbase.client.core.ServiceNotAvailableException;
+import com.couchbase.client.core.endpoint.kv.AuthenticationException;
+import com.couchbase.client.core.state.NotConnectedException;
 import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
 import com.couchbase.client.deps.io.netty.buffer.Unpooled;
 import com.couchbase.client.java.Bucket;
 import com.couchbase.client.java.document.BinaryDocument;
 import com.couchbase.client.java.document.RawJsonDocument;
+import com.couchbase.client.java.error.DocumentDoesNotExistException;
+import com.couchbase.client.java.error.DurabilityException;
+import com.couchbase.client.java.error.RequestTooBigException;
 
 
 public class TestGetCouchbaseKey {
@@ -92,6 +103,7 @@ public class TestGetCouchbaseKey {
 
         testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
         testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_RETRY, 0);
         testRunner.assertTransferCount(REL_FAILURE, 0);
         MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         outFile.assertContentEquals(content);
@@ -104,57 +116,110 @@ public class TestGetCouchbaseKey {
     }
 
 
-    /**
-     * Use static document id even if doc id expression is set.
-     */
     @Test
-    public void testStaticDocIdAndDocIdExp() throws Exception {
-        String docId = "doc-a";
-        String docIdExp = "${someProperty}";
+    public void testDocIdExp() throws Exception {
+        String docIdExp = "${'someProperty'}";
+        String somePropertyValue = "doc-p";
 
         Bucket bucket = mock(Bucket.class);
         String content = "{\"key\":\"value\"}";
-        when(bucket.get(docId, RawJsonDocument.class)).thenReturn(RawJsonDocument.create(docId, content));
+        when(bucket.get(somePropertyValue, RawJsonDocument.class))
+            .thenReturn(RawJsonDocument.create(somePropertyValue, content));
         setupMockBucket(bucket);
 
-        testRunner.setProperty(DOC_ID, docId);
-        testRunner.setProperty(DOC_ID_EXP, docIdExp);
+        testRunner.setProperty(DOC_ID, docIdExp);
+
+        byte[] inFileData = "input FlowFile data".getBytes(StandardCharsets.UTF_8);
+        Map<String, String> properties = new HashMap<>();
+        properties.put("someProperty", somePropertyValue);
+        testRunner.enqueue(inFileData, properties);
         testRunner.run();
 
-        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
         testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_ORIGINAL, 1);
+        testRunner.assertTransferCount(REL_RETRY, 0);
         testRunner.assertTransferCount(REL_FAILURE, 0);
         MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         outFile.assertContentEquals(content);
     }
 
     @Test
-    public void testDocIdExp() throws Exception {
-        String docIdExp = "${'someProperty'}";
-        String somePropertyValue = "doc-p";
+    public void testDocIdExpWithNullFlowFile() throws Exception {
+        String docIdExp = "doc-s";
+        String docId = "doc-s";
 
         Bucket bucket = mock(Bucket.class);
         String content = "{\"key\":\"value\"}";
-        when(bucket.get(somePropertyValue, RawJsonDocument.class))
-            .thenReturn(RawJsonDocument.create(somePropertyValue, content));
+        when(bucket.get(docId, RawJsonDocument.class))
+            .thenReturn(RawJsonDocument.create(docId, content));
         setupMockBucket(bucket);
 
-        testRunner.setProperty(DOC_ID_EXP, docIdExp);
+        testRunner.setProperty(DOC_ID, docIdExp);
 
-        byte[] inFileData = "input FlowFile data".getBytes(StandardCharsets.UTF_8);
-        Map<String, String> properties = new HashMap<>();
-        properties.put("someProperty", somePropertyValue);
-        testRunner.enqueue(inFileData, properties);
         testRunner.run();
 
         testRunner.assertTransferCount(REL_SUCCESS, 1);
-        testRunner.assertTransferCount(REL_ORIGINAL, 1);
+        testRunner.assertTransferCount(REL_ORIGINAL, 0);
+        testRunner.assertTransferCount(REL_RETRY, 0);
         testRunner.assertTransferCount(REL_FAILURE, 0);
         MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         outFile.assertContentEquals(content);
     }
 
     @Test
+    public void testDocIdExpWithInvalidExpression() throws Exception {
+        String docIdExp = "${nonExistingFunction('doc-s')}";
+        String docId = "doc-s";
+
+        Bucket bucket = mock(Bucket.class);
+        String content = "{\"key\":\"value\"}";
+        when(bucket.get(docId, RawJsonDocument.class))
+            .thenReturn(RawJsonDocument.create(docId, content));
+        setupMockBucket(bucket);
+
+        testRunner.setProperty(DOC_ID, docIdExp);
+
+        try {
+            testRunner.run();
+            fail("ProcessException should be throws.");
+        } catch (AssertionError e){
+            Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
+        }
+
+        testRunner.assertTransferCount(REL_SUCCESS, 0);
+        testRunner.assertTransferCount(REL_ORIGINAL, 0);
+        testRunner.assertTransferCount(REL_RETRY, 0);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+    }
+
+    @Test
+    public void testDocIdExpWithInvalidExpressionOnFlowFile() throws Exception {
+        String docIdExp = "${nonExistingFunction(someProperty)}";
+
+        Bucket bucket = mock(Bucket.class);
+        setupMockBucket(bucket);
+
+        testRunner.setProperty(DOC_ID, docIdExp);
+
+        String inputFileDataStr = "input FlowFile data";
+        byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8);
+        Map<String, String> properties = new HashMap<>();
+        properties.put("someProperty", "someValue");
+        testRunner.enqueue(inFileData, properties);
+        try {
+            testRunner.run();
+            fail("ProcessException should be throws.");
+        } catch (AssertionError e){
+            Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
+        }
+
+        testRunner.assertTransferCount(REL_SUCCESS, 0);
+        testRunner.assertTransferCount(REL_ORIGINAL, 0);
+        testRunner.assertTransferCount(REL_RETRY, 0);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+    }
+
+    @Test
     public void testInputFlowFileContent() throws Exception {
 
         Bucket bucket = mock(Bucket.class);
@@ -171,9 +236,12 @@ public class TestGetCouchbaseKey {
 
         testRunner.assertTransferCount(REL_SUCCESS, 1);
         testRunner.assertTransferCount(REL_ORIGINAL, 1);
+        testRunner.assertTransferCount(REL_RETRY, 0);
         testRunner.assertTransferCount(REL_FAILURE, 0);
         MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         outFile.assertContentEquals(content);
+        MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_ORIGINAL).get(0);
+        orgFile.assertContentEquals(inFileDataStr);
     }
 
     @Test
@@ -195,9 +263,12 @@ public class TestGetCouchbaseKey {
 
         testRunner.assertTransferCount(REL_SUCCESS, 1);
         testRunner.assertTransferCount(REL_ORIGINAL, 1);
+        testRunner.assertTransferCount(REL_RETRY, 0);
         testRunner.assertTransferCount(REL_FAILURE, 0);
         MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         outFile.assertContentEquals(content);
+        MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_ORIGINAL).get(0);
+        orgFile.assertContentEquals(inFileDataStr);
     }
 
 
@@ -213,12 +284,175 @@ public class TestGetCouchbaseKey {
 
         byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8);
         testRunner.enqueue(inFileData);
+        try {
+            testRunner.run();
+            fail("ProcessException should be throws.");
+        } catch (AssertionError e){
+            Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
+        }
+
+        testRunner.assertTransferCount(REL_SUCCESS, 0);
+        testRunner.assertTransferCount(REL_ORIGINAL, 0);
+        testRunner.assertTransferCount(REL_RETRY, 0);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+    }
+
+    @Test
+    public void testCouchbaseConfigurationError() throws Exception {
+        String docIdExp = "doc-c";
+
+        Bucket bucket = mock(Bucket.class);
+        when(bucket.get(docIdExp, RawJsonDocument.class))
+            .thenThrow(new AuthenticationException());
+        setupMockBucket(bucket);
+
+        testRunner.setProperty(DOC_ID, docIdExp);
+
+        String inputFileDataStr = "input FlowFile data";
+        byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8);
+        testRunner.enqueue(inFileData);
+        try {
+            testRunner.run();
+            fail("ProcessException should be throws.");
+        } catch (AssertionError e){
+            Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
+            Assert.assertTrue(e.getCause().getCause().getClass().equals(AuthenticationException.class));
+        }
+
+        testRunner.assertTransferCount(REL_SUCCESS, 0);
+        testRunner.assertTransferCount(REL_ORIGINAL, 0);
+        testRunner.assertTransferCount(REL_RETRY, 0);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+    }
+
+    @Test
+    public void testCouchbaseInvalidInputError() throws Exception {
+        String docIdExp = "doc-c";
+
+        Bucket bucket = mock(Bucket.class);
+        CouchbaseException exception = new RequestTooBigException();
+        when(bucket.get(docIdExp, RawJsonDocument.class))
+            .thenThrow(exception);
+        setupMockBucket(bucket);
+
+        testRunner.setProperty(DOC_ID, docIdExp);
+
+        String inputFileDataStr = "input FlowFile data";
+        byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8);
+        testRunner.enqueue(inFileData);
+        testRunner.run();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 0);
+        testRunner.assertTransferCount(REL_ORIGINAL, 0);
+        testRunner.assertTransferCount(REL_RETRY, 0);
+        testRunner.assertTransferCount(REL_FAILURE, 1);
+        MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0);
+        orgFile.assertContentEquals(inputFileDataStr);
+        orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName());
+    }
+
+    @Test
+    public void testCouchbaseTempClusterError() throws Exception {
+        String docIdExp = "doc-c";
+
+        Bucket bucket = mock(Bucket.class);
+        CouchbaseException exception = new BackpressureException();
+        when(bucket.get(docIdExp, RawJsonDocument.class))
+            .thenThrow(exception);
+        setupMockBucket(bucket);
+
+        testRunner.setProperty(DOC_ID, docIdExp);
+
+        String inputFileDataStr = "input FlowFile data";
+        byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8);
+        testRunner.enqueue(inFileData);
+        testRunner.run();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 0);
+        testRunner.assertTransferCount(REL_ORIGINAL, 0);
+        testRunner.assertTransferCount(REL_RETRY, 1);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0);
+        orgFile.assertContentEquals(inputFileDataStr);
+        orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName());
+    }
+
+
+    @Test
+    public void testCouchbaseTempFlowFileError() throws Exception {
+        String docIdExp = "doc-c";
+
+        Bucket bucket = mock(Bucket.class);
+        // There is no suitable CouchbaseException for temp flowfile error, currently.
+        CouchbaseException exception = new DurabilityException();
+        when(bucket.get(docIdExp, RawJsonDocument.class))
+            .thenThrow(exception);
+        setupMockBucket(bucket);
+
+        testRunner.setProperty(DOC_ID, docIdExp);
+
+        String inputFileDataStr = "input FlowFile data";
+        byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8);
+        testRunner.enqueue(inFileData);
+        testRunner.run();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 0);
+        testRunner.assertTransferCount(REL_ORIGINAL, 0);
+        testRunner.assertTransferCount(REL_RETRY, 1);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0);
+        orgFile.assertContentEquals(inputFileDataStr);
+        orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName());
+    }
+
+    @Test
+    public void testCouchbaseFatalError() throws Exception {
+        String docIdExp = "doc-c";
+
+        Bucket bucket = mock(Bucket.class);
+        CouchbaseException exception = new NotConnectedException();
+        when(bucket.get(docIdExp, RawJsonDocument.class))
+            .thenThrow(exception);
+        setupMockBucket(bucket);
+
+        testRunner.setProperty(DOC_ID, docIdExp);
+
+        String inputFileDataStr = "input FlowFile data";
+        byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8);
+        testRunner.enqueue(inFileData);
+        testRunner.run();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 0);
+        testRunner.assertTransferCount(REL_ORIGINAL, 0);
+        testRunner.assertTransferCount(REL_RETRY, 0);
+        testRunner.assertTransferCount(REL_FAILURE, 1);
+        MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0);
+        orgFile.assertContentEquals(inputFileDataStr);
+        orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName());
+    }
+
+    @Test
+    public void testDocumentNotFound() throws Exception {
+        String docIdExp = "doc-n";
+
+        Bucket bucket = mock(Bucket.class);
+        when(bucket.get(docIdExp, RawJsonDocument.class))
+            .thenReturn(null);
+        setupMockBucket(bucket);
+
+        testRunner.setProperty(DOC_ID, docIdExp);
+
+        String inputFileDataStr = "input FlowFile data";
+        byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8);
+        testRunner.enqueue(inFileData);
         testRunner.run();
 
         testRunner.assertTransferCount(REL_SUCCESS, 0);
         testRunner.assertTransferCount(REL_ORIGINAL, 0);
+        testRunner.assertTransferCount(REL_RETRY, 0);
         testRunner.assertTransferCount(REL_FAILURE, 1);
-        MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0);
-        outFile.assertContentEquals(inFileDataStr);
+        MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0);
+        orgFile.assertContentEquals(inputFileDataStr);
+        orgFile.assertAttributeEquals(Exception.key(), DocumentDoesNotExistException.class.getName());
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
index 3995528..0388e35 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
@@ -16,13 +16,15 @@
  */
 package org.apache.nifi.processors.couchbase;
 
+import static org.apache.nifi.couchbase.CouchbaseAttributes.Exception;
 import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.BUCKET_NAME;
 import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.COUCHBASE_CLUSTER_SERVICE;
 import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID;
-import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID_EXP;
 import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE;
+import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_RETRY;
 import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -38,14 +40,18 @@ import java.util.Map;
 import org.apache.nifi.couchbase.CouchbaseAttributes;
 import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import com.couchbase.client.core.CouchbaseException;
+import com.couchbase.client.core.ServiceNotAvailableException;
 import com.couchbase.client.java.Bucket;
 import com.couchbase.client.java.PersistTo;
 import com.couchbase.client.java.ReplicateTo;
@@ -102,6 +108,7 @@ public class TestPutCouchbaseKey {
 
         testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
         testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_RETRY, 0);
         testRunner.assertTransferCount(REL_FAILURE, 0);
         MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         outFile.assertContentEquals(inFileData);
@@ -134,44 +141,44 @@ public class TestPutCouchbaseKey {
 
         testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
         testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_RETRY, 0);
         testRunner.assertTransferCount(REL_FAILURE, 0);
         MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         outFile.assertContentEquals(inFileData);
     }
 
-    /**
-     * Use static document id even if doc id expression is set.
-     */
     @Test
-    public void testStaticDocIdAndDocIdExp() throws Exception {
-        String docId = "doc-a";
-        String docIdExp = "${someProperty}";
+    public void testDocIdExp() throws Exception {
+        String docIdExp = "${'someProperty'}";
+        String somePropertyValue = "doc-p";
 
         String inFileData = "{\"key\":\"value\"}";
         byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
 
         Bucket bucket = mock(Bucket.class);
         when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)))
-            .thenReturn(RawJsonDocument.create(docId, inFileData));
+            .thenReturn(RawJsonDocument.create(somePropertyValue, inFileData));
         setupMockBucket(bucket);
 
-        testRunner.enqueue(inFileDataBytes);
-        testRunner.setProperty(DOC_ID, docId);
-        testRunner.setProperty(DOC_ID_EXP, docIdExp);
+        testRunner.setProperty(DOC_ID, docIdExp);
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put("someProperty", somePropertyValue);
+        testRunner.enqueue(inFileDataBytes, properties);
         testRunner.run();
 
         verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
 
-        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
         testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_RETRY, 0);
         testRunner.assertTransferCount(REL_FAILURE, 0);
         MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         outFile.assertContentEquals(inFileData);
     }
 
     @Test
-    public void testDocIdExp() throws Exception {
-        String docIdExp = "${'someProperty'}";
+    public void testInvalidDocIdExp() throws Exception {
+        String docIdExp = "${invalid_function(someProperty)}";
         String somePropertyValue = "doc-p";
 
         String inFileData = "{\"key\":\"value\"}";
@@ -182,19 +189,21 @@ public class TestPutCouchbaseKey {
             .thenReturn(RawJsonDocument.create(somePropertyValue, inFileData));
         setupMockBucket(bucket);
 
-        testRunner.setProperty(DOC_ID_EXP, docIdExp);
+        testRunner.setProperty(DOC_ID, docIdExp);
 
         Map<String, String> properties = new HashMap<>();
         properties.put("someProperty", somePropertyValue);
         testRunner.enqueue(inFileDataBytes, properties);
-        testRunner.run();
-
-        verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
+        try {
+            testRunner.run();
+            fail("ProcessException should be throws.");
+        } catch (AssertionError e){
+            Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
+        }
 
-        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_SUCCESS, 0);
+        testRunner.assertTransferCount(REL_RETRY, 0);
         testRunner.assertTransferCount(REL_FAILURE, 0);
-        MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
-        outFile.assertContentEquals(inFileData);
     }
 
     @Test
@@ -219,6 +228,7 @@ public class TestPutCouchbaseKey {
         assertEquals(uuid, capture.getValue().id());
 
         testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_RETRY, 0);
         testRunner.assertTransferCount(REL_FAILURE, 0);
         MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         outFile.assertContentEquals(inFileData);
@@ -235,20 +245,53 @@ public class TestPutCouchbaseKey {
 
         Bucket bucket = mock(Bucket.class);
         when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE)))
-            .thenThrow(new DurabilityException());
+            .thenThrow(new ServiceNotAvailableException());
         setupMockBucket(bucket);
 
         testRunner.enqueue(inFileDataBytes);
         testRunner.setProperty(DOC_ID, docId);
         testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString());
-        testRunner.run();
+        try {
+            testRunner.run();
+            fail("ProcessException should be throws.");
+        } catch (AssertionError e){
+            Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
+        }
 
         verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE));
 
         testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
         testRunner.assertTransferCount(REL_SUCCESS, 0);
-        testRunner.assertTransferCount(REL_FAILURE, 1);
-        MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0);
-        outFile.assertContentEquals(inFileData);
+        testRunner.assertTransferCount(REL_RETRY, 0);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+    }
+
+    @Test
+    public void testCouchbaseTempFlowFileError() throws Exception {
+
+        String docId = "doc-a";
+
+        String inFileData = "{\"key\":\"value\"}";
+        byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
+
+        Bucket bucket = mock(Bucket.class);
+        CouchbaseException exception = new DurabilityException();
+        when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE)))
+            .thenThrow(exception);
+        setupMockBucket(bucket);
+
+        testRunner.enqueue(inFileDataBytes);
+        testRunner.setProperty(DOC_ID, docId);
+        testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString());
+        testRunner.run();
+
+        verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE));
+
+        testRunner.assertTransferCount(REL_SUCCESS, 0);
+        testRunner.assertTransferCount(REL_RETRY, 1);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0);
+        orgFile.assertContentEquals(inFileData);
+        orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName());
     }
 }