You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2015/10/01 21:43:15 UTC

[3/4] nifi git commit: nifi-992 Improvements based on code review part II.

nifi-992 Improvements based on code review part II.

- Penalize or Yield based on ErrorHandlingStrategy.Penalty
- Add Retry relationship to PutCouchbaseKey
- Remove unnecessary try/catch and let the framework handle it
- Change CouchbaseException relation mapping for Fatal from Failure to Retry

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/033a1553
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/033a1553
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/033a1553

Branch: refs/heads/master
Commit: 033a1553abe7c83d4d9c49b753911355a881dcc5
Parents: 72eb64e
Author: ijokarumawak <ij...@gmail.com>
Authored: Thu Oct 1 15:05:29 2015 +0900
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Oct 1 14:13:20 2015 -0400

----------------------------------------------------------------------
 .../couchbase/AbstractCouchbaseProcessor.java   | 16 ++++++++++-
 .../couchbase/ErrorHandlingStrategy.java        | 28 +++++++++++++++++---
 .../processors/couchbase/GetCouchbaseKey.java   | 28 +++++++++-----------
 .../processors/couchbase/PutCouchbaseKey.java   | 14 ++++------
 .../couchbase/TestGetCouchbaseKey.java          | 19 ++++++-------
 .../couchbase/TestPutCouchbaseKey.java          | 13 ++++++---
 6 files changed, 76 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/033a1553/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
index 066b1ca..b879041 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
@@ -175,18 +175,32 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
 
     /**
      * Handles the thrown CocuhbaseException accordingly.
+     * @param context a process context
      * @param session a process session
      * @param logger a logger
      * @param inFile an input FlowFile
      * @param e the thrown CouchbaseException
      * @param errMsg a message to be logged
      */
-    protected void handleCouchbaseException(final ProcessSession session,
+    protected void handleCouchbaseException(final ProcessContext context, final ProcessSession session,
             final ProcessorLog logger, FlowFile inFile, CouchbaseException e,
             String errMsg) {
         logger.error(errMsg, e);
         if(inFile != null){
             ErrorHandlingStrategy strategy = CouchbaseExceptionMappings.getStrategy(e);
+            switch(strategy.penalty()) {
+            case Penalize:
+                if(logger.isDebugEnabled()) logger.debug("Penalized: {}", new Object[]{inFile});
+                inFile = session.penalize(inFile);
+                break;
+            case Yield:
+                if(logger.isDebugEnabled()) logger.debug("Yielded context: {}", new Object[]{inFile});
+                context.yield();
+                break;
+            case None:
+                break;
+            }
+
             switch(strategy.result()) {
             case ProcessException:
                 throw new ProcessException(errMsg, e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/033a1553/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java
index 75b8f46..bae35d5 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.couchbase;
 
+import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.None;
 import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.Penalize;
 import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.Yield;
 import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.Failure;
@@ -25,11 +26,32 @@ import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.
 
 public enum ErrorHandlingStrategy {
 
+    /**
+     * Processor setting has to be fixed, in order to NOT call failing processor
+     * frequently, this it be yielded.
+     */
     ConfigurationError(ProcessException, Yield),
-    InvalidInput(Failure, Penalize),
+    /**
+     * The input FlowFile will be sent to the failure relationship for further
+     * processing without penalizing. Basically, the FlowFile shouldn't be sent
+     * this processor again unless the issue has been solved.
+     */
+    InvalidInput(Failure, None),
+    /**
+     * Couchbase cluster is in unhealthy state. Retrying maybe successful,
+     * but it should be yielded for a while.
+     */
     TemporalClusterError(Retry, Yield),
+    /**
+     * The FlowFile was not processed successfully due to some temporal error
+     * related to this specific FlowFile or document. Retrying maybe successful,
+     * but it should be penalized for a while.
+     */
     TemporalFlowFileError(Retry, Penalize),
-    Fatal(Failure, Yield);
+    /**
+     * The error can't be recovered without DataFlow Manager intervention.
+     */
+    Fatal(Retry, Yield);
 
     private final Result result;
     private final Penalty penalty;
@@ -46,7 +68,7 @@ public enum ErrorHandlingStrategy {
      * Indicating yield or penalize the processing when transfer the input FlowFile.
      */
     public enum Penalty {
-        Yield, Penalize;
+        Yield, Penalize, None;
     }
 
     public Result result(){

http://git-wip-us.apache.org/repos/asf/nifi/blob/033a1553/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
index 8c15e29..4aa9677 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
@@ -89,21 +89,17 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
         FlowFile inFile = session.get();
 
         String docId = null;
-        try {
-            if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
-                docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue();
-            } else {
-                final byte[] content = new byte[(int) inFile.getSize()];
-                session.read(inFile, new InputStreamCallback() {
-                    @Override
-                    public void process(final InputStream in) throws IOException {
-                        StreamUtils.fillBuffer(in, content, true);
-                    }
-                });
-                docId = new String(content, StandardCharsets.UTF_8);
-            }
-        } catch (Throwable t) {
-            throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile);
+        if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
+            docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue();
+        } else {
+            final byte[] content = new byte[(int) inFile.getSize()];
+            session.read(inFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    StreamUtils.fillBuffer(in, content, true);
+                }
+            });
+            docId = new String(content, StandardCharsets.UTF_8);
         }
 
         if(StringUtils.isEmpty(docId)){
@@ -163,7 +159,7 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
 
         } catch (CouchbaseException e){
             String errMsg = String.format("Getting docuement %s from Couchbase Server using %s failed due to %s", docId, inFile, e);
-            handleCouchbaseException(session, logger, inFile, e, errMsg);
+            handleCouchbaseException(context, session, logger, inFile, e, errMsg);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/033a1553/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
index 8f41383..2aa803c 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
@@ -99,6 +99,7 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
     @Override
     protected void addSupportedRelationships(Set<Relationship> relationships) {
         relationships.add(REL_SUCCESS);
+        relationships.add(REL_RETRY);
         relationships.add(REL_FAILURE);
     }
 
@@ -110,7 +111,6 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
             return;
         }
 
-        String docId = null;
         final byte[] content = new byte[(int) flowFile.getSize()];
         session.read(flowFile, new InputStreamCallback() {
             @Override
@@ -119,13 +119,9 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
             }
         });
 
-        try {
-            docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key()));
-            if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
-                docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
-            }
-        } catch (Throwable t) {
-            throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + flowFile);
+        String docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key()));
+        if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
+            docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
         }
 
         try {
@@ -158,7 +154,7 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
 
         } catch (CouchbaseException e) {
             String errMsg = String.format("Writing docuement %s to Couchbase Server using %s failed due to %s", docId, flowFile, e);
-            handleCouchbaseException(session, logger, flowFile, e, errMsg);
+            handleCouchbaseException(context, session, logger, flowFile, e, errMsg);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/033a1553/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
index dca2ae3..108980c 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
@@ -34,6 +34,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException;
 import org.apache.nifi.couchbase.CouchbaseAttributes;
 import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -181,9 +182,9 @@ public class TestGetCouchbaseKey {
 
         try {
             testRunner.run();
-            fail("ProcessException should be throws.");
+            fail("Exception should be thrown.");
         } catch (AssertionError e){
-            Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
+            Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class));
         }
 
         testRunner.assertTransferCount(REL_SUCCESS, 0);
@@ -208,9 +209,9 @@ public class TestGetCouchbaseKey {
         testRunner.enqueue(inFileData, properties);
         try {
             testRunner.run();
-            fail("ProcessException should be throws.");
+            fail("Exception should be thrown.");
         } catch (AssertionError e){
-            Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
+            Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class));
         }
 
         testRunner.assertTransferCount(REL_SUCCESS, 0);
@@ -286,7 +287,7 @@ public class TestGetCouchbaseKey {
         testRunner.enqueue(inFileData);
         try {
             testRunner.run();
-            fail("ProcessException should be throws.");
+            fail("ProcessException should be thrown.");
         } catch (AssertionError e){
             Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
         }
@@ -313,7 +314,7 @@ public class TestGetCouchbaseKey {
         testRunner.enqueue(inFileData);
         try {
             testRunner.run();
-            fail("ProcessException should be throws.");
+            fail("ProcessException should be thrown.");
         } catch (AssertionError e){
             Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
             Assert.assertTrue(e.getCause().getCause().getClass().equals(AuthenticationException.class));
@@ -424,9 +425,9 @@ public class TestGetCouchbaseKey {
 
         testRunner.assertTransferCount(REL_SUCCESS, 0);
         testRunner.assertTransferCount(REL_ORIGINAL, 0);
-        testRunner.assertTransferCount(REL_RETRY, 0);
-        testRunner.assertTransferCount(REL_FAILURE, 1);
-        MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0);
+        testRunner.assertTransferCount(REL_RETRY, 1);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0);
         orgFile.assertContentEquals(inputFileDataStr);
         orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName());
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/033a1553/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
index 0388e35..f870593 100644
--- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
@@ -37,6 +37,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException;
 import org.apache.nifi.couchbase.CouchbaseAttributes;
 import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -167,7 +168,10 @@ public class TestPutCouchbaseKey {
         testRunner.enqueue(inFileDataBytes, properties);
         testRunner.run();
 
-        verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
+        ArgumentCaptor<RawJsonDocument> capture = ArgumentCaptor.forClass(RawJsonDocument.class);
+        verify(bucket, times(1)).upsert(capture.capture(), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
+        assertEquals(somePropertyValue, capture.getValue().id());
+        assertEquals(inFileData, capture.getValue().content());
 
         testRunner.assertTransferCount(REL_SUCCESS, 1);
         testRunner.assertTransferCount(REL_RETRY, 0);
@@ -196,9 +200,9 @@ public class TestPutCouchbaseKey {
         testRunner.enqueue(inFileDataBytes, properties);
         try {
             testRunner.run();
-            fail("ProcessException should be throws.");
+            fail("Exception should be thrown.");
         } catch (AssertionError e){
-            Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
+            Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class));
         }
 
         testRunner.assertTransferCount(REL_SUCCESS, 0);
@@ -226,6 +230,7 @@ public class TestPutCouchbaseKey {
         ArgumentCaptor<RawJsonDocument> capture = ArgumentCaptor.forClass(RawJsonDocument.class);
         verify(bucket, times(1)).upsert(capture.capture(), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
         assertEquals(uuid, capture.getValue().id());
+        assertEquals(inFileData, capture.getValue().content());
 
         testRunner.assertTransferCount(REL_SUCCESS, 1);
         testRunner.assertTransferCount(REL_RETRY, 0);
@@ -253,7 +258,7 @@ public class TestPutCouchbaseKey {
         testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString());
         try {
             testRunner.run();
-            fail("ProcessException should be throws.");
+            fail("ProcessException should be thrown.");
         } catch (AssertionError e){
             Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
         }