You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/09/20 15:55:27 UTC

[GitHub] [nifi] turcsanyip commented on a diff in pull request #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK

turcsanyip commented on code in PR #6319:
URL: https://github.com/apache/nifi/pull/6319#discussion_r975117881


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml:
##########
@@ -62,74 +62,40 @@
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-core</artifactId>
-            <version>${azure.core.version}</version>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-identity</artifactId>
-            <version>${azure.identity.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.azure</groupId>
-                    <artifactId>azure-core</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-eventhubs</artifactId>
-            <version>${azure-eventhubs.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-keyvault</artifactId>
-            <version>${azure-keyvault.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-eventhubs-eph</artifactId>
-            <version>${azure-eventhubs-eph.version}</version>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-messaging-eventhubs</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-storage</artifactId>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-cosmos</artifactId>
-            <version>${azure-cosmos.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.azure</groupId>
-                    <artifactId>azure-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-storage-file-datalake</artifactId>
-            <version>${azure-storage-file-datalake.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.azure</groupId>
-                    <artifactId>azure-core</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-storage-blob</artifactId>
-            <version>${azure-storage-blob.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.azure</groupId>
-                    <artifactId>azure-core</artifactId>
-                </exclusion>
-            </exclusions>
+        </dependency>
+        <!-- Legacy Microsoft Azure Libraries -->
+        <dependency>
+            <groupId>com.microsoft.azure</groupId>
+            <artifactId>azure-keyvault</artifactId>
+            <version>${azure-keyvault.version}</version>
+        </dependency>

Review Comment:
   `azure-keyvault` still brings `jsr305` in but you mentioned that it is no longer a transitive dependency of current libraries.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java:
##########
@@ -166,257 +152,135 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return propertyDescriptors;
     }
 
-    private ScheduledExecutorService executor;
-
     @OnScheduled
-    public final void setupClient(final ProcessContext context) throws ProcessException{
+    public final void createClient(final ProcessContext context) {
+        eventHubProducerClient = createEventHubProducerClient(context);
     }
 
     @OnStopped
-    public void tearDown() {
-        EventHubClient sender;
-        while ((sender = senderQueue.poll()) != null) {
-            sender.close();
+    public void closeClient() {
+        if (eventHubProducerClient == null) {
+            getLogger().info("Azure Event Hub Producer Client not configured");
+        } else {
+            eventHubProducerClient.close();
         }
     }
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext context) {
-        List<ValidationResult> retVal = AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
-        return retVal;
+        return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
     }
 
-
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        try {
-            populateSenderQueue(context);
-        } catch (ProcessException e) {
-            context.yield();
-            throw e;
-        }
-
         final StopWatch stopWatch = new StopWatch(true);
 
         final String partitioningKeyAttributeName = context.getProperty(PARTITIONING_KEY_ATTRIBUTE_NAME).getValue();
 
-        // Get N flow files
-        final int maxBatchSize = NumberUtils.toInt(context.getProperty(MAX_BATCH_SIZE).getValue(), 100);
-        final List<FlowFile> flowFileList = session.get(maxBatchSize);
+        final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
 
-        // Convert and send each flow file
-        final BlockingQueue<CompletableFuture<FlowFileResultCarrier<Relationship>>> futureQueue = new LinkedBlockingQueue<>();
-        for (FlowFile flowFile : flowFileList) {
-            if (flowFile == null) {
-                continue;
-            }
+        final List<FlowFileResultCarrier<Relationship>> flowFileResults = new ArrayList<>();
+        for (final FlowFile flowFile : flowFileBatch) {
+            final FlowFileResultCarrier<Relationship> flowFileResult = handleFlowFile(flowFile, partitioningKeyAttributeName, session);
+            flowFileResults.add(flowFileResult);
+        }
 
-            futureQueue.offer(handleFlowFile(flowFile, partitioningKeyAttributeName, session));
+        processFlowFileResults(context, session, stopWatch, flowFileResults);
+    }
+
+    protected EventHubProducerClient createEventHubProducerClient(final ProcessContext context) throws ProcessException {
+        final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+        final String policyName, policyKey;
+        if (useManagedIdentity) {
+            policyName = AzureEventHubUtils.MANAGED_IDENTITY_POLICY;
+            policyKey = null;
+        } else {
+            policyName = context.getProperty(ACCESS_POLICY).getValue();
+            policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
         }
+        final String namespace = context.getProperty(NAMESPACE).getValue();
+        final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
+        final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
 
-        waitForAllFutures(context, session, stopWatch, futureQueue);
+        try {
+            final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
+
+            final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint);
+            if (AzureEventHubUtils.MANAGED_IDENTITY_POLICY.equals(policyName)) {
+                final ManagedIdentityCredentialBuilder managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
+                final ManagedIdentityCredential managedIdentityCredential = managedIdentityCredentialBuilder.build();
+                eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, managedIdentityCredential);
+            } else {
+                final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
+                eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
+            }

Review Comment:
   I think the code for the Managed Identity case could be simplified:
   - no `MANAGED_IDENTITY_POLICY` constant needed at all
   - `if (useManagedIdentity)` block (line 195-201) can be deleted
   - and modify this `if`:
   ```suggestion
               if (useManagedIdentity) {
                   final ManagedIdentityCredentialBuilder managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
                   final ManagedIdentityCredential managedIdentityCredential = managedIdentityCredentialBuilder.build();
                   eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, managedIdentityCredential);
               } else {
                   final String policyName = context.getProperty(ACCESS_POLICY).getValue();
                   final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
                   final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
                   eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org