You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/03/07 05:27:06 UTC
[nifi] branch master updated: NIFI-3685 Azure EventHub processor
tests fail
This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new d53cefa NIFI-3685 Azure EventHub processor tests fail
d53cefa is described below
commit d53cefa2a75d47b5e75c6fb55e86bada759cce79
Author: Ferenc Szabó <fs...@cloudera.com>
AuthorDate: Mon Mar 4 14:30:56 2019 +0100
NIFI-3685 Azure EventHub processor tests fail
Moved test classes into the proper folder.
Fixed outdated mocking and added assertions that actually tests the content.
This closes #3346.
Signed-off-by: Koji Kawamura <ij...@apache.org>
---
.../azure/eventhub/GetAzureEventHubTest.java | 75 +++++++++++++++++-----
.../azure/eventhub/PutAzureEventHubTest.java | 4 +-
2 files changed, 62 insertions(+), 17 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
similarity index 69%
rename from nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
rename to nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index 951384a..a3b80bf 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -17,21 +17,28 @@
package org.apache.nifi.processors.azure.eventhub;
import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventData.SystemProperties;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.servicebus.ServiceBusException;
+import com.microsoft.azure.servicebus.amqp.AmqpConstants;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutionException;
+
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
-import org.powermock.reflect.Whitebox;
-
-import java.io.IOException;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-import java.util.LinkedList;
-import java.util.concurrent.ExecutionException;
+import org.mockito.internal.util.reflection.Whitebox;
public class GetAzureEventHubTest {
@@ -40,6 +47,10 @@ public class GetAzureEventHubTest {
private static final String eventHubName = "get-test";
private static final String sasKeyName = "bogus-policy";
private static final String sasKey = "9rHmHqxoOVWOb8wS09dvqXYxnNiLqxNMCbmt6qMaQyU!";
+ private static final Date ENQUEUED_TIME_VALUE = Date.from(Clock.fixed(Instant.now(), ZoneId.systemDefault()).instant());
+ public static final long SEQUENCE_NUMBER_VALUE = 13L;
+ public static final String OFFSET_VALUE = "100";
+ public static final String PARTITION_KEY_VALUE = "0";
private TestRunner testRunner;
private MockGetAzureEventHub processor;
@@ -69,13 +80,12 @@ public class GetAzureEventHubTest {
testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT,"10000");
testRunner.assertValid();
}
-
+
@Test
public void verifyRelationships(){
-
assert(1 == processor.getRelationships().size());
-
}
+
@Test
public void testNoPartitions(){
MockGetAzureEventHubNoPartitions mockProcessor = new MockGetAzureEventHubNoPartitions();
@@ -85,6 +95,7 @@ public class GetAzureEventHubTest {
testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 0);
testRunner.clearTransferState();
}
+
@Test
public void testNullRecieve(){
setUpStandardTestConfig();
@@ -93,6 +104,7 @@ public class GetAzureEventHubTest {
testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 0);
testRunner.clearTransferState();
}
+
@Test(expected = AssertionError.class)
public void testThrowGetReceiver(){
setUpStandardTestConfig();
@@ -101,12 +113,37 @@ public class GetAzureEventHubTest {
testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 0);
testRunner.clearTransferState();
}
+
@Test
public void testNormalFlow() throws Exception {
+ setUpStandardTestConfig();
+ testRunner.run(1, true);
+ testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 10);
+ MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GetAzureEventHub.REL_SUCCESS).get(0);
+ flowFile.assertContentEquals("test event number: 0");
+ flowFile.assertAttributeEquals("eventhub.enqueued.timestamp", ENQUEUED_TIME_VALUE.toInstant().toString());
+ flowFile.assertAttributeEquals("eventhub.offset", OFFSET_VALUE);
+ flowFile.assertAttributeEquals("eventhub.sequence", String.valueOf(SEQUENCE_NUMBER_VALUE));
+ flowFile.assertAttributeEquals("eventhub.name", eventHubName);
+
+ testRunner.clearTransferState();
+ }
+
+ @Test
+ public void testNormalNotReceivedEventsFlow() throws Exception {
setUpStandardTestConfig();
+ processor.received = false;
testRunner.run(1, true);
testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 10);
+
+ MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GetAzureEventHub.REL_SUCCESS).get(0);
+ flowFile.assertContentEquals("test event number: 0");
+ flowFile.assertAttributeNotExists("eventhub.enqueued.timestamp");
+ flowFile.assertAttributeNotExists("eventhub.offset");
+ flowFile.assertAttributeNotExists("eventhub.sequence");
+ flowFile.assertAttributeEquals("eventhub.name", eventHubName);
+
testRunner.clearTransferState();
}
@@ -117,6 +154,7 @@ public class GetAzureEventHubTest {
boolean nullReceive = false;
boolean getReceiverThrow = false;
+ boolean received = true;
@Override
protected void setupReceiver(final String connectionString) throws ProcessException{
@@ -140,12 +178,17 @@ public class GetAzureEventHubTest {
}
final LinkedList<EventData> receivedEvents = new LinkedList<>();
for(int i = 0; i < 10; i++){
- final EventData eventData = new EventData(String.format("test event number: %d",i).getBytes());
- Whitebox.setInternalState(eventData,"isReceivedEvent",true);
- Whitebox.setInternalState(eventData, "partitionKey","0");
- Whitebox.setInternalState(eventData, "offset", "100");
- Whitebox.setInternalState(eventData, "sequenceNumber",13L);
- Whitebox.setInternalState(eventData, "enqueuedTime",Instant.now().minus(100L, ChronoUnit.SECONDS));
+ EventData eventData = new EventData(String.format("test event number: %d", i).getBytes());
+ if (received) {
+ HashMap<String, Object> properties = new HashMap<>();
+ properties.put(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME, PARTITION_KEY_VALUE);
+ properties.put(AmqpConstants.OFFSET_ANNOTATION_NAME, OFFSET_VALUE);
+ properties.put(AmqpConstants.SEQUENCE_NUMBER_ANNOTATION_NAME, SEQUENCE_NUMBER_VALUE);
+ properties.put(AmqpConstants.ENQUEUED_TIME_UTC_ANNOTATION_NAME, ENQUEUED_TIME_VALUE);
+
+ SystemProperties systemProperties = new SystemProperties(properties);
+ Whitebox.setInternalState(eventData, "systemProperties", systemProperties);
+ }
receivedEvents.add(eventData);
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
similarity index 98%
rename from nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
rename to nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
index 7568619..1129ac7 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
@@ -99,7 +99,9 @@ public class PutAzureEventHubTest {
private static class MockPutAzureEventHub extends PutAzureEventHub{
byte[] receivedBuffer = null;
- byte[] getReceivedBuffer(){return receivedBuffer;}
+ byte[] getReceivedBuffer(){
+ return receivedBuffer;
+ }
@Override
protected EventHubClient createEventHubClient(final String namespace, final String eventHubName, final String policyName, final String policyKey) throws ProcessException {