You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/03/07 05:27:06 UTC

[nifi] branch master updated: NIFI-3685 Azure EventHub processor tests fail

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new d53cefa  NIFI-3685 Azure EventHub processor tests fail
d53cefa is described below

commit d53cefa2a75d47b5e75c6fb55e86bada759cce79
Author: Ferenc Szabó <fs...@cloudera.com>
AuthorDate: Mon Mar 4 14:30:56 2019 +0100

    NIFI-3685 Azure EventHub processor tests fail
    
    Moved test classes into the proper folder.
    Fixed outdated mocking and added assertions that actually tests the content.
    
    This closes #3346.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../azure/eventhub/GetAzureEventHubTest.java       | 75 +++++++++++++++++-----
 .../azure/eventhub/PutAzureEventHubTest.java       |  4 +-
 2 files changed, 62 insertions(+), 17 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
similarity index 69%
rename from nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
rename to nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index 951384a..a3b80bf 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -17,21 +17,28 @@
 package org.apache.nifi.processors.azure.eventhub;
 
 import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventData.SystemProperties;
 import com.microsoft.azure.eventhubs.PartitionReceiver;
 import com.microsoft.azure.servicebus.ServiceBusException;
+import com.microsoft.azure.servicebus.amqp.AmqpConstants;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutionException;
+
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
 import org.junit.Test;
-import org.powermock.reflect.Whitebox;
-
-import java.io.IOException;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-import java.util.LinkedList;
-import java.util.concurrent.ExecutionException;
+import org.mockito.internal.util.reflection.Whitebox;
 
 
 public class GetAzureEventHubTest {
@@ -40,6 +47,10 @@ public class GetAzureEventHubTest {
     private static final String eventHubName = "get-test";
     private static final String sasKeyName = "bogus-policy";
     private static final String sasKey = "9rHmHqxoOVWOb8wS09dvqXYxnNiLqxNMCbmt6qMaQyU!";
+    private static final Date ENQUEUED_TIME_VALUE = Date.from(Clock.fixed(Instant.now(), ZoneId.systemDefault()).instant());
+    public static final long SEQUENCE_NUMBER_VALUE = 13L;
+    public static final String OFFSET_VALUE = "100";
+    public static final String PARTITION_KEY_VALUE = "0";
 
     private TestRunner testRunner;
     private MockGetAzureEventHub processor;
@@ -69,13 +80,12 @@ public class GetAzureEventHubTest {
         testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT,"10000");
         testRunner.assertValid();
     }
-    
+
     @Test
     public void verifyRelationships(){
-
         assert(1 == processor.getRelationships().size());
-
     }
+
     @Test
     public void testNoPartitions(){
         MockGetAzureEventHubNoPartitions mockProcessor = new MockGetAzureEventHubNoPartitions();
@@ -85,6 +95,7 @@ public class GetAzureEventHubTest {
         testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 0);
         testRunner.clearTransferState();
     }
+
     @Test
     public void testNullRecieve(){
         setUpStandardTestConfig();
@@ -93,6 +104,7 @@ public class GetAzureEventHubTest {
         testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 0);
         testRunner.clearTransferState();
     }
+
     @Test(expected = AssertionError.class)
     public void testThrowGetReceiver(){
         setUpStandardTestConfig();
@@ -101,12 +113,37 @@ public class GetAzureEventHubTest {
         testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 0);
         testRunner.clearTransferState();
     }
+
     @Test
     public void testNormalFlow() throws Exception {
+        setUpStandardTestConfig();
+        testRunner.run(1, true);
+        testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 10);
 
+        MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GetAzureEventHub.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals("test event number: 0");
+        flowFile.assertAttributeEquals("eventhub.enqueued.timestamp", ENQUEUED_TIME_VALUE.toInstant().toString());
+        flowFile.assertAttributeEquals("eventhub.offset", OFFSET_VALUE);
+        flowFile.assertAttributeEquals("eventhub.sequence", String.valueOf(SEQUENCE_NUMBER_VALUE));
+        flowFile.assertAttributeEquals("eventhub.name", eventHubName);
+
+        testRunner.clearTransferState();
+    }
+
+    @Test
+    public void testNormalNotReceivedEventsFlow() throws Exception {
         setUpStandardTestConfig();
+        processor.received = false;
         testRunner.run(1, true);
         testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 10);
+
+        MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GetAzureEventHub.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals("test event number: 0");
+        flowFile.assertAttributeNotExists("eventhub.enqueued.timestamp");
+        flowFile.assertAttributeNotExists("eventhub.offset");
+        flowFile.assertAttributeNotExists("eventhub.sequence");
+        flowFile.assertAttributeEquals("eventhub.name", eventHubName);
+
         testRunner.clearTransferState();
     }
 
@@ -117,6 +154,7 @@ public class GetAzureEventHubTest {
 
         boolean nullReceive = false;
         boolean getReceiverThrow = false;
+        boolean received = true;
 
         @Override
         protected void setupReceiver(final String connectionString) throws ProcessException{
@@ -140,12 +178,17 @@ public class GetAzureEventHubTest {
             }
             final LinkedList<EventData> receivedEvents = new LinkedList<>();
             for(int i = 0; i < 10; i++){
-                final EventData eventData = new EventData(String.format("test event number: %d",i).getBytes());
-                Whitebox.setInternalState(eventData,"isReceivedEvent",true);
-                Whitebox.setInternalState(eventData, "partitionKey","0");
-                Whitebox.setInternalState(eventData, "offset", "100");
-                Whitebox.setInternalState(eventData, "sequenceNumber",13L);
-                Whitebox.setInternalState(eventData, "enqueuedTime",Instant.now().minus(100L, ChronoUnit.SECONDS));
+                EventData eventData = new EventData(String.format("test event number: %d", i).getBytes());
+                if (received) {
+                    HashMap<String, Object> properties = new HashMap<>();
+                    properties.put(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME, PARTITION_KEY_VALUE);
+                    properties.put(AmqpConstants.OFFSET_ANNOTATION_NAME, OFFSET_VALUE);
+                    properties.put(AmqpConstants.SEQUENCE_NUMBER_ANNOTATION_NAME, SEQUENCE_NUMBER_VALUE);
+                    properties.put(AmqpConstants.ENQUEUED_TIME_UTC_ANNOTATION_NAME, ENQUEUED_TIME_VALUE);
+
+                    SystemProperties systemProperties = new SystemProperties(properties);
+                    Whitebox.setInternalState(eventData, "systemProperties", systemProperties);
+                }
                 receivedEvents.add(eventData);
             }
 
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
similarity index 98%
rename from nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
rename to nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
index 7568619..1129ac7 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
@@ -99,7 +99,9 @@ public class PutAzureEventHubTest {
     private static class MockPutAzureEventHub extends PutAzureEventHub{
 
         byte[] receivedBuffer = null;
-        byte[] getReceivedBuffer(){return receivedBuffer;}
+        byte[] getReceivedBuffer(){
+            return receivedBuffer;
+        }
 
         @Override
         protected EventHubClient createEventHubClient(final String namespace, final String eventHubName, final String policyName, final String policyKey) throws ProcessException {