You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2015/10/01 21:43:14 UTC
[2/4] nifi git commit: nifi-992 Improvements based on code review.
nifi-992 Improvements based on code review.
- Removed checkstyle and contrib-check profile since it's inherit from
top-level pom.
- Consolidate DOC_ID and DOC_ID_EXP into a single DOC_ID property.
- Add capability description on GetCouchbaseKey.
- Fixed documentation spell misses.
- Handle Exceptions accordingly.
- Add 'retry' relationship.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/72eb64e8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/72eb64e8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/72eb64e8
Branch: refs/heads/master
Commit: 72eb64e8a43a08a5be988a3826b0a116c57915ea
Parents: 2466a24
Author: ijokarumawak <ij...@gmail.com>
Authored: Wed Sep 30 00:58:39 2015 +0900
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Sep 29 13:48:13 2015 -0400
----------------------------------------------------------------------
.../nifi-couchbase-processors/pom.xml | 144 ----------
.../nifi/couchbase/CouchbaseAttributes.java | 4 +
.../couchbase/AbstractCouchbaseProcessor.java | 94 ++++---
.../couchbase/CouchbaseExceptionMappings.java | 128 +++++++++
.../couchbase/ErrorHandlingStrategy.java | 59 ++++
.../processors/couchbase/GetCouchbaseKey.java | 45 ++-
.../processors/couchbase/PutCouchbaseKey.java | 45 +--
.../couchbase/TestCouchbaseClusterService.java | 2 +-
.../couchbase/TestGetCouchbaseKey.java | 282 +++++++++++++++++--
.../couchbase/TestPutCouchbaseKey.java | 95 +++++--
10 files changed, 626 insertions(+), 272 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml
index 33b0baa..257ef46 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml
@@ -61,148 +61,4 @@
<scope>test</scope>
</dependency>
</dependencies>
- <build>
- <pluginManagement>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <version>2.15</version>
- <dependencies>
- <dependency>
- <groupId>com.puppycrawl.tools</groupId>
- <artifactId>checkstyle</artifactId>
- <version>6.5</version>
- </dependency>
- </dependencies>
- </plugin>
- </plugins>
- </pluginManagement>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <configuration>
- <checkstyleRules>
- <module name="Checker">
- <property name="charset" value="UTF-8" />
- <property name="severity" value="warning" />
- <!-- Checks for whitespace -->
- <!-- See http://checkstyle.sf.net/config_whitespace.html -->
- <module name="FileTabCharacter">
- <property name="eachLine" value="true" />
- </module>
- <module name="TreeWalker">
- <module name="RegexpSinglelineJava">
- <property name="format" value="\s+$" />
- <property name="message" value="Line has trailing whitespace." />
- </module>
- <module name="RegexpSinglelineJava">
- <property name="format" value="[@]see\s+[{][@]link" />
- <property name="message" value="Javadoc @see does not need @link: pick one or the other." />
- </module>
- <module name="OuterTypeFilename" />
- <module name="LineLength">
- <!-- needs extra, because Eclipse formatter ignores the ending left
- brace -->
- <property name="max" value="200" />
- <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://" />
- </module>
- <module name="AvoidStarImport" />
- <module name="UnusedImports">
- <property name="processJavadoc" value="true" />
- </module>
- <module name="NoLineWrap" />
- <module name="LeftCurly">
- <property name="maxLineLength" value="160" />
- </module>
- <module name="RightCurly" />
- <module name="RightCurly">
- <property name="option" value="alone" />
- <property name="tokens" value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, INSTANCE_INIT" />
- </module>
- <module name="SeparatorWrap">
- <property name="tokens" value="DOT" />
- <property name="option" value="nl" />
- </module>
- <module name="SeparatorWrap">
- <property name="tokens" value="COMMA" />
- <property name="option" value="EOL" />
- </module>
- <module name="PackageName">
- <property name="format" value="^[a-z]+(\.[a-z][a-zA-Z0-9]*)*$" />
- </module>
- <module name="MethodTypeParameterName">
- <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)" />
- </module>
- <module name="MethodParamPad" />
- <module name="OperatorWrap">
- <property name="option" value="NL" />
- <property name="tokens" value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, QUESTION, SL, SR, STAR " />
- </module>
- <module name="AnnotationLocation">
- <property name="tokens" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF" />
- </module>
- <module name="AnnotationLocation">
- <property name="tokens" value="VARIABLE_DEF" />
- <property name="allowSamelineMultipleAnnotations" value="true" />
- </module>
- <module name="NonEmptyAtclauseDescription" />
- <module name="JavadocMethod">
- <property name="allowMissingJavadoc" value="true" />
- <property name="allowMissingParamTags" value="true" />
- <property name="allowMissingThrowsTags" value="true" />
- <property name="allowMissingReturnTag" value="true" />
- <property name="allowedAnnotations" value="Override,Test,BeforeClass,AfterClass,Before,After" />
- <property name="allowThrowsTagsForSubclasses" value="true" />
- </module>
- <module name="SingleLineJavadoc" />
- </module>
- </module>
- </checkstyleRules>
- <violationSeverity>warning</violationSeverity>
- <includeTestSourceDirectory>true</includeTestSourceDirectory>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <profiles>
- <profile>
- <!-- Checks style and licensing requirements. This is a good idea to run
- for contributions and for the release process. While it would be nice to
- run always these plugins can considerably slow the build and have proven
- to create unstable builds in our multi-module project and when building using
- multiple threads. The stability issues seen with Checkstyle in multi-module
- builds include false-positives and false negatives. -->
- <id>contrib-check</id>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>check</goal>
- </goals>
- <phase>verify</phase>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <executions>
- <execution>
- <id>check-style</id>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java
index a4d69fc..3bef8c5 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java
@@ -43,6 +43,10 @@ public enum CouchbaseAttributes implements FlowFileAttributeKey {
* The expiration of a related document.
*/
Expiry("couchbase.doc.expiry"),
+ /**
+ * The thrown CouchbaseException class.
+ */
+ Exception("couchbase.exception"),
;
private final String key;
http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
index d370728..066b1ca 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
@@ -23,13 +23,19 @@ import java.util.List;
import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.couchbase.CouchbaseAttributes;
import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.java.Bucket;
/**
@@ -46,49 +52,45 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
.build();
public static final PropertyDescriptor DOC_ID = new PropertyDescriptor
- .Builder().name("Static Document Id")
- .description("A static, fixed Couchbase document id.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor DOC_ID_EXP = new PropertyDescriptor
- .Builder().name("Document Id Expression")
- .description("An expression to construct the Couchbase document id."
- + " If 'Static Document Id' is specified, then 'Static Document Id' is used.")
- .required(false)
+ .Builder().name("Document Id")
+ .description("A static, fixed Couchbase document id."
+ + "Or an expression to construct the Couchbase document id.")
.expressionLanguageSupported(true)
- .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("All FlowFiles that are written to Couchbase Server are routed to this relationship.")
- .build();
+ .name("success")
+ .description("All FlowFiles that are written to Couchbase Server are routed to this relationship.")
+ .build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
- .name("original")
- .description("The original input file will be routed to this destination when it has been successfully processed.")
- .build();
+ .name("original")
+ .description("The original input file will be routed to this destination when it has been successfully processed.")
+ .build();
+ public static final Relationship REL_RETRY = new Relationship.Builder()
+ .name("retry")
+ .description("All FlowFiles that cannot written to Couchbase Server but can be retried are routed to this relationship.")
+ .build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("All FlowFiles that cannot written to Couchbase Server are routed to this relationship.")
- .build();
+ .name("failure")
+ .description("All FlowFiles that cannot written to Couchbase Server and can't be retried are routed to this relationship.")
+ .build();
public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor
- .Builder().name("Couchbase Cluster Controller Service")
- .description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.")
- .required(true)
- .identifiesControllerService(CouchbaseClusterControllerService.class)
- .build();
+ .Builder().name("Couchbase Cluster Controller Service")
+ .description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.")
+ .required(true)
+ .identifiesControllerService(CouchbaseClusterControllerService.class)
+ .build();
public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor
- .Builder().name("Bucket Name")
- .description("The name of bucket to access.")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .defaultValue("default")
- .build();
+ .Builder().name("Bucket Name")
+ .description("The name of bucket to access.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue("default")
+ .build();
private List<PropertyDescriptor> descriptors;
@@ -171,4 +173,32 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
.toString();
}
+ /**
+ * Handles the thrown CocuhbaseException accordingly.
+ * @param session a process session
+ * @param logger a logger
+ * @param inFile an input FlowFile
+ * @param e the thrown CouchbaseException
+ * @param errMsg a message to be logged
+ */
+ protected void handleCouchbaseException(final ProcessSession session,
+ final ProcessorLog logger, FlowFile inFile, CouchbaseException e,
+ String errMsg) {
+ logger.error(errMsg, e);
+ if(inFile != null){
+ ErrorHandlingStrategy strategy = CouchbaseExceptionMappings.getStrategy(e);
+ switch(strategy.result()) {
+ case ProcessException:
+ throw new ProcessException(errMsg, e);
+ case Failure:
+ inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
+ session.transfer(inFile, REL_FAILURE);
+ break;
+ case Retry:
+ inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
+ session.transfer(inFile, REL_RETRY);
+ break;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java
new file mode 100644
index 0000000..87ffabb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.ConfigurationError;
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Fatal;
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.InvalidInput;
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.TemporalClusterError;
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.TemporalFlowFileError;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.couchbase.client.core.BackpressureException;
+import com.couchbase.client.core.BucketClosedException;
+import com.couchbase.client.core.CouchbaseException;
+import com.couchbase.client.core.DocumentConcurrentlyModifiedException;
+import com.couchbase.client.core.DocumentMutationLostException;
+import com.couchbase.client.core.ReplicaNotConfiguredException;
+import com.couchbase.client.core.RequestCancelledException;
+import com.couchbase.client.core.ServiceNotAvailableException;
+import com.couchbase.client.core.config.ConfigurationException;
+import com.couchbase.client.core.endpoint.SSLException;
+import com.couchbase.client.core.endpoint.kv.AuthenticationException;
+import com.couchbase.client.core.env.EnvironmentException;
+import com.couchbase.client.core.state.NotConnectedException;
+import com.couchbase.client.java.error.BucketDoesNotExistException;
+import com.couchbase.client.java.error.CannotRetryException;
+import com.couchbase.client.java.error.CouchbaseOutOfMemoryException;
+import com.couchbase.client.java.error.DurabilityException;
+import com.couchbase.client.java.error.InvalidPasswordException;
+import com.couchbase.client.java.error.RequestTooBigException;
+import com.couchbase.client.java.error.TemporaryFailureException;
+import com.couchbase.client.java.error.TranscodingException;
+
+public class CouchbaseExceptionMappings {
+
+ private static final Map<Class<? extends CouchbaseException>, ErrorHandlingStrategy>mapping = new HashMap<>();
+
+ /*
+ * - Won't happen
+ * BucketAlreadyExistsException: never create a bucket
+ * CASMismatchException: cas-id and replace is not used yet
+ * DesignDocumentException: View is not used yet
+ * DocumentAlreadyExistsException: insert is not used yet
+ * DocumentDoesNotExistException: replace is not used yet
+ * FlushDisabledException: never call flush
+ * RepositoryMappingException: EntityDocument is not used
+ * TemporaryLockFailureException: we don't obtain locks
+ * ViewDoesNotExistException: View is not used yet
+ * NamedPreparedStatementException: N1QL is not used yet
+ * QueryExecutionException: N1QL is not used yet
+ */
+ static {
+ /*
+ * ConfigurationError
+ */
+ mapping.put(AuthenticationException.class, ConfigurationError);
+ mapping.put(BucketDoesNotExistException.class, ConfigurationError);
+ mapping.put(ConfigurationException.class, ConfigurationError);
+ mapping.put(InvalidPasswordException.class, ConfigurationError);
+ mapping.put(EnvironmentException.class, ConfigurationError);
+ // when Couchbase doesn't have enough replica
+ mapping.put(ReplicaNotConfiguredException.class, ConfigurationError);
+ // when a particular Service(KV, View, Query, DCP) isn't running in a cluster
+ mapping.put(ServiceNotAvailableException.class, ConfigurationError);
+ // SSL configuration error, such as key store mis configuration.
+ mapping.put(SSLException.class, ConfigurationError);
+
+ /*
+ * InvalidInput
+ */
+ mapping.put(RequestTooBigException.class, InvalidInput);
+ mapping.put(TranscodingException.class, InvalidInput);
+
+ /*
+ * Temporal Cluster Error
+ */
+ mapping.put(BackpressureException.class, TemporalClusterError);
+ mapping.put(CouchbaseOutOfMemoryException.class, TemporalClusterError);
+ mapping.put(TemporaryFailureException.class, TemporalClusterError);
+ // occurs when a connection gets lost
+ mapping.put(RequestCancelledException.class, TemporalClusterError);
+
+ /*
+ * Temporal FlowFile Error
+ */
+ mapping.put(DocumentConcurrentlyModifiedException.class, TemporalFlowFileError);
+ mapping.put(DocumentMutationLostException.class, TemporalFlowFileError);
+ mapping.put(DurabilityException.class, TemporalFlowFileError);
+
+ /*
+ * Fatal
+ */
+ mapping.put(BucketClosedException.class, Fatal);
+ mapping.put(CannotRetryException.class, Fatal);
+ mapping.put(NotConnectedException.class, Fatal);
+ }
+
+ /**
+ * Returns a registered error handling strategy.
+ * @param e the CouchbaseException
+ * @return a registered strategy, if it's not registered, then return Fatal
+ */
+ public static ErrorHandlingStrategy getStrategy(CouchbaseException e){
+ ErrorHandlingStrategy strategy = mapping.get(e.getClass());
+ if(strategy == null) {
+ // Treat unknown Exception as Fatal.
+ return ErrorHandlingStrategy.Fatal;
+ }
+ return strategy;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java
new file mode 100644
index 0000000..75b8f46
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.Penalize;
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.Yield;
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.Failure;
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.ProcessException;
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.Retry;
+
+
+public enum ErrorHandlingStrategy {
+
+ ConfigurationError(ProcessException, Yield),
+ InvalidInput(Failure, Penalize),
+ TemporalClusterError(Retry, Yield),
+ TemporalFlowFileError(Retry, Penalize),
+ Fatal(Failure, Yield);
+
+ private final Result result;
+ private final Penalty penalty;
+ private ErrorHandlingStrategy(Result result, Penalty penalty){
+ this.result = result;
+ this.penalty = penalty;
+ }
+
+ public enum Result {
+ ProcessException, Failure, Retry;
+ }
+
+ /**
+ * Indicating yield or penalize the processing when transfer the input FlowFile.
+ */
+ public enum Penalty {
+ Yield, Penalize;
+ }
+
+ public Result result(){
+ return this.result;
+ }
+
+ public Penalty penalty(){
+ return this.penalty;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
index 6d9a476..8c15e29 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
@@ -45,24 +45,27 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
+import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.document.BinaryDocument;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.RawJsonDocument;
+import com.couchbase.client.java.error.DocumentDoesNotExistException;
@Tags({ "nosql", "couchbase", "database", "get" })
-@CapabilityDescription("Get a document from Couchbase Server via Key/Value access.")
+@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. This processor can be triggered by an incoming FlowFile, or it can be scheduled on a timer")
@SeeAlso({CouchbaseClusterControllerService.class})
@ReadsAttributes({
- @ReadsAttribute(attribute = "FlowFile content", description = "Used as a document id if none of 'Static Document Id' or 'Document Id Expression' is specified"),
- @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id Excepression.")
+ @ReadsAttribute(attribute = "FlowFile content", description = "Used as a document id if 'Document Id' is not specified"),
+ @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.")
})
@WritesAttributes({
@WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was retrieved from."),
@WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was retrieved from."),
@WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."),
@WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."),
- @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document.")
+ @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."),
+ @WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.")
})
public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
@@ -70,13 +73,13 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
descriptors.add(DOCUMENT_TYPE);
descriptors.add(DOC_ID);
- descriptors.add(DOC_ID_EXP);
}
@Override
protected void addSupportedRelationships(Set<Relationship> relationships) {
relationships.add(REL_SUCCESS);
relationships.add(REL_ORIGINAL);
+ relationships.add(REL_RETRY);
relationships.add(REL_FAILURE);
}
@@ -86,15 +89,9 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
FlowFile inFile = session.get();
String docId = null;
- if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
- docId = context.getProperty(DOC_ID).getValue();
- } else {
- // Otherwise docId has to be extracted from inFile.
- if ( inFile == null ) {
- return;
- }
- if(!StringUtils.isEmpty(context.getProperty(DOC_ID_EXP).getValue())){
- docId = context.getProperty(DOC_ID_EXP).evaluateAttributeExpressions(inFile).getValue();
+ try {
+ if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
+ docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue();
} else {
final byte[] content = new byte[(int) inFile.getSize()];
session.read(inFile, new InputStreamCallback() {
@@ -105,11 +102,14 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
});
docId = new String(content, StandardCharsets.UTF_8);
}
+ } catch (Throwable t) {
+ throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile);
}
if(StringUtils.isEmpty(docId)){
- logger.error("Couldn't get document id from from {}", new Object[]{inFile});
- session.transfer(inFile, REL_FAILURE);
+ if(inFile != null){
+ throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile);
+ }
}
try {
@@ -137,8 +137,9 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
}
if(doc == null) {
- logger.info("Document {} was not found in {}", new Object[]{docId, getTransitUrl(context)});
+ logger.warn("Document {} was not found in {}", new Object[]{docId, getTransitUrl(context)});
if(inFile != null){
+ inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName());
session.transfer(inFile, REL_FAILURE);
}
return;
@@ -160,13 +161,11 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
session.getProvenanceReporter().receive(outFile, getTransitUrl(context));
session.transfer(outFile, REL_SUCCESS);
- } catch (Throwable t){
- logger.error("Getting docuement {} from Couchbase Server using {} failed due to {}",
- new Object[]{docId, inFile, t}, t);
- if(inFile != null){
- session.transfer(inFile, REL_FAILURE);
- }
+ } catch (CouchbaseException e){
+ String errMsg = String.format("Getting docuement %s from Couchbase Server using %s failed due to %s", docId, inFile, e);
+ handleCouchbaseException(session, logger, inFile, e, errMsg);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
index 6bfa480..8f41383 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
@@ -45,6 +45,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
+import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.java.PersistTo;
@@ -57,15 +58,16 @@ import com.couchbase.client.java.document.RawJsonDocument;
@CapabilityDescription("Put a document to Couchbase Server via Key/Value access.")
@SeeAlso({CouchbaseClusterControllerService.class})
@ReadsAttributes({
- @ReadsAttribute(attribute = "uuid", description = "Used as a document id if none of 'Static Document Id' or 'Document Id Expression' is specified"),
- @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id Excepression.")
+ @ReadsAttribute(attribute = "uuid", description = "Used as a document id if 'Document Id' is not specified"),
+ @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.")
})
@WritesAttributes({
@WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was stored."),
@WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was stored."),
@WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."),
@WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."),
- @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document.")
+ @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."),
+ @WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.")
})
public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
@@ -90,7 +92,6 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
descriptors.add(DOCUMENT_TYPE);
descriptors.add(DOC_ID);
- descriptors.add(DOC_ID_EXP);
descriptors.add(PERSIST_TO);
descriptors.add(REPLICATE_TO);
}
@@ -109,24 +110,25 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
return;
}
- try {
-
- final byte[] content = new byte[(int) flowFile.getSize()];
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- StreamUtils.fillBuffer(in, content, true);
- }
- });
-
+ String docId = null;
+ final byte[] content = new byte[(int) flowFile.getSize()];
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ StreamUtils.fillBuffer(in, content, true);
+ }
+ });
- String docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key()));
+ try {
+ docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key()));
if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
- docId = context.getProperty(DOC_ID).getValue();
- } else if(!StringUtils.isEmpty(context.getProperty(DOC_ID_EXP).getValue())){
- docId = context.getProperty(DOC_ID_EXP).evaluateAttributeExpressions(flowFile).getValue();
+ docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
}
+ } catch (Throwable t) {
+ throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + flowFile);
+ }
+ try {
Document<?> doc = null;
DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
switch (documentType){
@@ -141,7 +143,6 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
}
}
-
PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue());
ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue());
doc = openBucket(context).upsert(doc, persistTo, replicateTo);
@@ -155,9 +156,9 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
session.getProvenanceReporter().send(flowFile, getTransitUrl(context));
session.transfer(flowFile, REL_SUCCESS);
- } catch (Throwable t) {
- logger.error("Writing {} into Couchbase Server failed due to {}", new Object[]{flowFile, t}, t);
- session.transfer(flowFile, REL_FAILURE);
+ } catch (CouchbaseException e) {
+ String errMsg = String.format("Writing docuement %s to Couchbase Server using %s failed due to %s", docId, flowFile, e);
+ handleCouchbaseException(session, logger, flowFile, e, errMsg);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java
index d96b1c2..eb2220d 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java
@@ -45,7 +45,7 @@ public class TestCouchbaseClusterService {
@Test
public void testConnectionFailure() throws InitializationException {
- String connectionString = "couchbase://invalid-hostname";
+ String connectionString = "invalid-protocol://invalid-hostname";
CouchbaseClusterControllerService service = new CouchbaseClusterService();
testRunner.addControllerService(SERVICE_ID, service);
testRunner.setProperty(service, CouchbaseClusterService.CONNECTION_STRING, connectionString);
http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
index 4ea4dff..dca2ae3 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
@@ -16,14 +16,16 @@
*/
package org.apache.nifi.processors.couchbase;
+import static org.apache.nifi.couchbase.CouchbaseAttributes.Exception;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.BUCKET_NAME;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.COUCHBASE_CLUSTER_SERVICE;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOCUMENT_TYPE;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID;
-import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID_EXP;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_ORIGINAL;
+import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_RETRY;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -34,19 +36,28 @@ import java.util.Map;
import org.apache.nifi.couchbase.CouchbaseAttributes;
import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import com.couchbase.client.core.BackpressureException;
+import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.core.ServiceNotAvailableException;
+import com.couchbase.client.core.endpoint.kv.AuthenticationException;
+import com.couchbase.client.core.state.NotConnectedException;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.document.BinaryDocument;
import com.couchbase.client.java.document.RawJsonDocument;
+import com.couchbase.client.java.error.DocumentDoesNotExistException;
+import com.couchbase.client.java.error.DurabilityException;
+import com.couchbase.client.java.error.RequestTooBigException;
public class TestGetCouchbaseKey {
@@ -92,6 +103,7 @@ public class TestGetCouchbaseKey {
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
+ testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
@@ -104,57 +116,110 @@ public class TestGetCouchbaseKey {
}
- /**
- * Use static document id even if doc id expression is set.
- */
@Test
- public void testStaticDocIdAndDocIdExp() throws Exception {
- String docId = "doc-a";
- String docIdExp = "${someProperty}";
+ public void testDocIdExp() throws Exception {
+ String docIdExp = "${'someProperty'}";
+ String somePropertyValue = "doc-p";
Bucket bucket = mock(Bucket.class);
String content = "{\"key\":\"value\"}";
- when(bucket.get(docId, RawJsonDocument.class)).thenReturn(RawJsonDocument.create(docId, content));
+ when(bucket.get(somePropertyValue, RawJsonDocument.class))
+ .thenReturn(RawJsonDocument.create(somePropertyValue, content));
setupMockBucket(bucket);
- testRunner.setProperty(DOC_ID, docId);
- testRunner.setProperty(DOC_ID_EXP, docIdExp);
+ testRunner.setProperty(DOC_ID, docIdExp);
+
+ byte[] inFileData = "input FlowFile data".getBytes(StandardCharsets.UTF_8);
+ Map<String, String> properties = new HashMap<>();
+ properties.put("someProperty", somePropertyValue);
+ testRunner.enqueue(inFileData, properties);
testRunner.run();
- testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
+ testRunner.assertTransferCount(REL_ORIGINAL, 1);
+ testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
}
@Test
- public void testDocIdExp() throws Exception {
- String docIdExp = "${'someProperty'}";
- String somePropertyValue = "doc-p";
+ public void testDocIdExpWithNullFlowFile() throws Exception {
+ String docIdExp = "doc-s";
+ String docId = "doc-s";
Bucket bucket = mock(Bucket.class);
String content = "{\"key\":\"value\"}";
- when(bucket.get(somePropertyValue, RawJsonDocument.class))
- .thenReturn(RawJsonDocument.create(somePropertyValue, content));
+ when(bucket.get(docId, RawJsonDocument.class))
+ .thenReturn(RawJsonDocument.create(docId, content));
setupMockBucket(bucket);
- testRunner.setProperty(DOC_ID_EXP, docIdExp);
+ testRunner.setProperty(DOC_ID, docIdExp);
- byte[] inFileData = "input FlowFile data".getBytes(StandardCharsets.UTF_8);
- Map<String, String> properties = new HashMap<>();
- properties.put("someProperty", somePropertyValue);
- testRunner.enqueue(inFileData, properties);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
- testRunner.assertTransferCount(REL_ORIGINAL, 1);
+ testRunner.assertTransferCount(REL_ORIGINAL, 0);
+ testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
}
@Test
+ public void testDocIdExpWithInvalidExpression() throws Exception {
+ String docIdExp = "${nonExistingFunction('doc-s')}";
+ String docId = "doc-s";
+
+ Bucket bucket = mock(Bucket.class);
+ String content = "{\"key\":\"value\"}";
+ when(bucket.get(docId, RawJsonDocument.class))
+ .thenReturn(RawJsonDocument.create(docId, content));
+ setupMockBucket(bucket);
+
+ testRunner.setProperty(DOC_ID, docIdExp);
+
+ try {
+ testRunner.run();
+ fail("ProcessException should be throws.");
+ } catch (AssertionError e){
+ Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
+ }
+
+ testRunner.assertTransferCount(REL_SUCCESS, 0);
+ testRunner.assertTransferCount(REL_ORIGINAL, 0);
+ testRunner.assertTransferCount(REL_RETRY, 0);
+ testRunner.assertTransferCount(REL_FAILURE, 0);
+ }
+
+ @Test
+ public void testDocIdExpWithInvalidExpressionOnFlowFile() throws Exception {
+ String docIdExp = "${nonExistingFunction(someProperty)}";
+
+ Bucket bucket = mock(Bucket.class);
+ setupMockBucket(bucket);
+
+ testRunner.setProperty(DOC_ID, docIdExp);
+
+ String inputFileDataStr = "input FlowFile data";
+ byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8);
+ Map<String, String> properties = new HashMap<>();
+ properties.put("someProperty", "someValue");
+ testRunner.enqueue(inFileData, properties);
+ try {
+ testRunner.run();
+ fail("ProcessException should be throws.");
+ } catch (AssertionError e){
+ Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
+ }
+
+ testRunner.assertTransferCount(REL_SUCCESS, 0);
+ testRunner.assertTransferCount(REL_ORIGINAL, 0);
+ testRunner.assertTransferCount(REL_RETRY, 0);
+ testRunner.assertTransferCount(REL_FAILURE, 0);
+ }
+
+ @Test
public void testInputFlowFileContent() throws Exception {
Bucket bucket = mock(Bucket.class);
@@ -171,9 +236,12 @@ public class TestGetCouchbaseKey {
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_ORIGINAL, 1);
+ testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
+ MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_ORIGINAL).get(0);
+ orgFile.assertContentEquals(inFileDataStr);
}
@Test
@@ -195,9 +263,12 @@ public class TestGetCouchbaseKey {
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_ORIGINAL, 1);
+ testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
+ MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_ORIGINAL).get(0);
+ orgFile.assertContentEquals(inFileDataStr);
}
@@ -213,12 +284,175 @@ public class TestGetCouchbaseKey {
byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8);
testRunner.enqueue(inFileData);
+ try {
+ testRunner.run();
+ fail("ProcessException should be throws.");
+ } catch (AssertionError e){
+ Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
+ }
+
+ testRunner.assertTransferCount(REL_SUCCESS, 0);
+ testRunner.assertTransferCount(REL_ORIGINAL, 0);
+ testRunner.assertTransferCount(REL_RETRY, 0);
+ testRunner.assertTransferCount(REL_FAILURE, 0);
+ }
+
+ @Test
+ public void testCouchbaseConfigurationError() throws Exception {
+ String docIdExp = "doc-c";
+
+ Bucket bucket = mock(Bucket.class);
+ when(bucket.get(docIdExp, RawJsonDocument.class))
+ .thenThrow(new AuthenticationException());
+ setupMockBucket(bucket);
+
+ testRunner.setProperty(DOC_ID, docIdExp);
+
+ String inputFileDataStr = "input FlowFile data";
+ byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8);
+ testRunner.enqueue(inFileData);
+ try {
+ testRunner.run();
+ fail("ProcessException should be throws.");
+ } catch (AssertionError e){
+ Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
+ Assert.assertTrue(e.getCause().getCause().getClass().equals(AuthenticationException.class));
+ }
+
+ testRunner.assertTransferCount(REL_SUCCESS, 0);
+ testRunner.assertTransferCount(REL_ORIGINAL, 0);
+ testRunner.assertTransferCount(REL_RETRY, 0);
+ testRunner.assertTransferCount(REL_FAILURE, 0);
+ }
+
+ @Test
+ public void testCouchbaseInvalidInputError() throws Exception {
+ String docIdExp = "doc-c";
+
+ Bucket bucket = mock(Bucket.class);
+ CouchbaseException exception = new RequestTooBigException();
+ when(bucket.get(docIdExp, RawJsonDocument.class))
+ .thenThrow(exception);
+ setupMockBucket(bucket);
+
+ testRunner.setProperty(DOC_ID, docIdExp);
+
+ String inputFileDataStr = "input FlowFile data";
+ byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8);
+ testRunner.enqueue(inFileData);
+ testRunner.run();
+
+ testRunner.assertTransferCount(REL_SUCCESS, 0);
+ testRunner.assertTransferCount(REL_ORIGINAL, 0);
+ testRunner.assertTransferCount(REL_RETRY, 0);
+ testRunner.assertTransferCount(REL_FAILURE, 1);
+ MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0);
+ orgFile.assertContentEquals(inputFileDataStr);
+ orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName());
+ }
+
+ @Test
+ public void testCouchbaseTempClusterError() throws Exception {
+ String docIdExp = "doc-c";
+
+ Bucket bucket = mock(Bucket.class);
+ CouchbaseException exception = new BackpressureException();
+ when(bucket.get(docIdExp, RawJsonDocument.class))
+ .thenThrow(exception);
+ setupMockBucket(bucket);
+
+ testRunner.setProperty(DOC_ID, docIdExp);
+
+ String inputFileDataStr = "input FlowFile data";
+ byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8);
+ testRunner.enqueue(inFileData);
+ testRunner.run();
+
+ testRunner.assertTransferCount(REL_SUCCESS, 0);
+ testRunner.assertTransferCount(REL_ORIGINAL, 0);
+ testRunner.assertTransferCount(REL_RETRY, 1);
+ testRunner.assertTransferCount(REL_FAILURE, 0);
+ MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0);
+ orgFile.assertContentEquals(inputFileDataStr);
+ orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName());
+ }
+
+
+ @Test
+ public void testCouchbaseTempFlowFileError() throws Exception {
+ String docIdExp = "doc-c";
+
+ Bucket bucket = mock(Bucket.class);
+ // There is no suitable CouchbaseException for temp flowfile error, currently.
+ CouchbaseException exception = new DurabilityException();
+ when(bucket.get(docIdExp, RawJsonDocument.class))
+ .thenThrow(exception);
+ setupMockBucket(bucket);
+
+ testRunner.setProperty(DOC_ID, docIdExp);
+
+ String inputFileDataStr = "input FlowFile data";
+ byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8);
+ testRunner.enqueue(inFileData);
+ testRunner.run();
+
+ testRunner.assertTransferCount(REL_SUCCESS, 0);
+ testRunner.assertTransferCount(REL_ORIGINAL, 0);
+ testRunner.assertTransferCount(REL_RETRY, 1);
+ testRunner.assertTransferCount(REL_FAILURE, 0);
+ MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0);
+ orgFile.assertContentEquals(inputFileDataStr);
+ orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName());
+ }
+
+ @Test
+ public void testCouchbaseFatalError() throws Exception {
+ String docIdExp = "doc-c";
+
+ Bucket bucket = mock(Bucket.class);
+ CouchbaseException exception = new NotConnectedException();
+ when(bucket.get(docIdExp, RawJsonDocument.class))
+ .thenThrow(exception);
+ setupMockBucket(bucket);
+
+ testRunner.setProperty(DOC_ID, docIdExp);
+
+ String inputFileDataStr = "input FlowFile data";
+ byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8);
+ testRunner.enqueue(inFileData);
+ testRunner.run();
+
+ testRunner.assertTransferCount(REL_SUCCESS, 0);
+ testRunner.assertTransferCount(REL_ORIGINAL, 0);
+ testRunner.assertTransferCount(REL_RETRY, 0);
+ testRunner.assertTransferCount(REL_FAILURE, 1);
+ MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0);
+ orgFile.assertContentEquals(inputFileDataStr);
+ orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName());
+ }
+
+ @Test
+ public void testDocumentNotFound() throws Exception {
+ String docIdExp = "doc-n";
+
+ Bucket bucket = mock(Bucket.class);
+ when(bucket.get(docIdExp, RawJsonDocument.class))
+ .thenReturn(null);
+ setupMockBucket(bucket);
+
+ testRunner.setProperty(DOC_ID, docIdExp);
+
+ String inputFileDataStr = "input FlowFile data";
+ byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8);
+ testRunner.enqueue(inFileData);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 0);
testRunner.assertTransferCount(REL_ORIGINAL, 0);
+ testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 1);
- MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0);
- outFile.assertContentEquals(inFileDataStr);
+ MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0);
+ orgFile.assertContentEquals(inputFileDataStr);
+ orgFile.assertAttributeEquals(Exception.key(), DocumentDoesNotExistException.class.getName());
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
index 3995528..0388e35 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
@@ -16,13 +16,15 @@
*/
package org.apache.nifi.processors.couchbase;
+import static org.apache.nifi.couchbase.CouchbaseAttributes.Exception;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.BUCKET_NAME;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.COUCHBASE_CLUSTER_SERVICE;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID;
-import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID_EXP;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE;
+import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_RETRY;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
@@ -38,14 +40,18 @@ import java.util.Map;
import org.apache.nifi.couchbase.CouchbaseAttributes;
import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import com.couchbase.client.core.CouchbaseException;
+import com.couchbase.client.core.ServiceNotAvailableException;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.PersistTo;
import com.couchbase.client.java.ReplicateTo;
@@ -102,6 +108,7 @@ public class TestPutCouchbaseKey {
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
+ testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
@@ -134,44 +141,44 @@ public class TestPutCouchbaseKey {
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
+ testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
}
- /**
- * Use static document id even if doc id expression is set.
- */
@Test
- public void testStaticDocIdAndDocIdExp() throws Exception {
- String docId = "doc-a";
- String docIdExp = "${someProperty}";
+ public void testDocIdExp() throws Exception {
+ String docIdExp = "${'someProperty'}";
+ String somePropertyValue = "doc-p";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)))
- .thenReturn(RawJsonDocument.create(docId, inFileData));
+ .thenReturn(RawJsonDocument.create(somePropertyValue, inFileData));
setupMockBucket(bucket);
- testRunner.enqueue(inFileDataBytes);
- testRunner.setProperty(DOC_ID, docId);
- testRunner.setProperty(DOC_ID_EXP, docIdExp);
+ testRunner.setProperty(DOC_ID, docIdExp);
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("someProperty", somePropertyValue);
+ testRunner.enqueue(inFileDataBytes, properties);
testRunner.run();
verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
- testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
+ testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
}
@Test
- public void testDocIdExp() throws Exception {
- String docIdExp = "${'someProperty'}";
+ public void testInvalidDocIdExp() throws Exception {
+ String docIdExp = "${invalid_function(someProperty)}";
String somePropertyValue = "doc-p";
String inFileData = "{\"key\":\"value\"}";
@@ -182,19 +189,21 @@ public class TestPutCouchbaseKey {
.thenReturn(RawJsonDocument.create(somePropertyValue, inFileData));
setupMockBucket(bucket);
- testRunner.setProperty(DOC_ID_EXP, docIdExp);
+ testRunner.setProperty(DOC_ID, docIdExp);
Map<String, String> properties = new HashMap<>();
properties.put("someProperty", somePropertyValue);
testRunner.enqueue(inFileDataBytes, properties);
- testRunner.run();
-
- verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
+ try {
+ testRunner.run();
+ fail("ProcessException should be throws.");
+ } catch (AssertionError e){
+ Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
+ }
- testRunner.assertTransferCount(REL_SUCCESS, 1);
+ testRunner.assertTransferCount(REL_SUCCESS, 0);
+ testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
- MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
- outFile.assertContentEquals(inFileData);
}
@Test
@@ -219,6 +228,7 @@ public class TestPutCouchbaseKey {
assertEquals(uuid, capture.getValue().id());
testRunner.assertTransferCount(REL_SUCCESS, 1);
+ testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
@@ -235,20 +245,53 @@ public class TestPutCouchbaseKey {
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE)))
- .thenThrow(new DurabilityException());
+ .thenThrow(new ServiceNotAvailableException());
setupMockBucket(bucket);
testRunner.enqueue(inFileDataBytes);
testRunner.setProperty(DOC_ID, docId);
testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString());
- testRunner.run();
+ try {
+ testRunner.run();
+ fail("ProcessException should be throws.");
+ } catch (AssertionError e){
+ Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
+ }
verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE));
testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
testRunner.assertTransferCount(REL_SUCCESS, 0);
- testRunner.assertTransferCount(REL_FAILURE, 1);
- MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0);
- outFile.assertContentEquals(inFileData);
+ testRunner.assertTransferCount(REL_RETRY, 0);
+ testRunner.assertTransferCount(REL_FAILURE, 0);
+ }
+
+ @Test
+ public void testCouchbaseTempFlowFileError() throws Exception {
+
+ String docId = "doc-a";
+
+ String inFileData = "{\"key\":\"value\"}";
+ byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
+
+ Bucket bucket = mock(Bucket.class);
+ CouchbaseException exception = new DurabilityException();
+ when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE)))
+ .thenThrow(exception);
+ setupMockBucket(bucket);
+
+ testRunner.enqueue(inFileDataBytes);
+ testRunner.setProperty(DOC_ID, docId);
+ testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString());
+ testRunner.run();
+
+ verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE));
+
+ testRunner.assertTransferCount(REL_SUCCESS, 0);
+ testRunner.assertTransferCount(REL_RETRY, 1);
+ testRunner.assertTransferCount(REL_FAILURE, 0);
+ MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0);
+ orgFile.assertContentEquals(inFileData);
+ orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName());
}
}