You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/08/22 15:54:44 UTC

[GitHub] [nifi] exceptionfactory opened a new pull request, #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK

exceptionfactory opened a new pull request, #6319:
URL: https://github.com/apache/nifi/pull/6319

   # Summary
   
   [NIFI-10381](https://issues.apache.org/jira/browse/NIFI-10381) Refactors the following Azure Event Hubs components from legacy version 3 to current version 5 of the Azure SDK for Java:
   
   - ConsumeAzureEventHub
   - GetAzureEventHub
   - PutAzureEventHub
   
   Refactoring also includes moving the `AzureEventHubRecordSink` from the separate `nifi-azure-record-sink-nar` to the shared `nifi-azure-nar`.
   
   With the exception of the `Consumer Host` property in `ConsumeAzureEventHub`, migrating to the current version of the Azure SDK maintains support for current component properties. The current Azure Event Hubs SDK handles unique Event Processor identification using an internal UUID.
   
   Upgrading to the current Azure SDK also provided the opportunity to streamline dependency management using the [Azure SDK Bill of Materials](https://search.maven.org/artifact/com.azure/azure-sdk-bom/1.2.4/pom). The `nifi-azure-processors` module retains legacy dependencies for Azure Storage and Azure Key Vault, but all current Azure SDK libraries can be aligned and managed using the Bill of Materials dependency.
   
   Internal changes include refactor `PutAzureEventHub` to use the standard synchronous client instead of creating a separate executor service. The current Azure SDK maintains an internal elastic executor for handling socket communication.
   
   Additional improvements include standardizing unit tests and removing duplication in some test methods. The `jsr305` module is no longer a transitive dependency of current libraries, so changes include removing this unnecessary exclusion.
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [X] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [X] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [X] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [X] Pull Request based on current revision of the `main` branch
   - [X] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [X] Build completed using `mvn clean install -P contrib-check`
     - [X] JDK 8
     - [X] JDK 11
     - [X] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on pull request #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on PR #6319:
URL: https://github.com/apache/nifi/pull/6319#issuecomment-1253251075

   Thanks for the feedback and testing @turcsanyip!
   
   I pushed an update incorporating your suggestions for credential handling in `PutAzureEventHub` and also addressed issues with Event Position handling.
   
   For `GetAzureEventHub`, the initial enqueued time determination happens once as suggested, and subsequent executions use the `PartitionProperties` to determine the last enqueued time. This approach should be more robust for handling gaps between processor execution.
   
   For `ConsumeAzureEventHub`, the new  `LegacyBlobStorageEventPositionProvider` is responsible for locating existing checkpoint information and returning it when found. The implementation follows the storage approach used in [AzureStorageCheckpointLeaseManager](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/microsoft-azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java) from the legacy SDK. The implementation will read existing sequence numbers as use them to derive a starting Event Position when found. The provider also deletes the blob since the consumer will subsequently store checkpoint information using the new SDK strategy. This approach should handle the basic migration scenario when upgrading NiFi to a version that includes these changes. It is probably worth including a note in the NiFi migration guidance for awareness.
   
   Please let me know if you notice any additional issues.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6319:
URL: https://github.com/apache/nifi/pull/6319#discussion_r981666699


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/position/LegacyBlobStorageEventPositionProvider.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub.position;
+
+import com.azure.core.util.BinaryData;
+import com.azure.messaging.eventhubs.models.EventPosition;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobContainerAsyncClient;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobListDetails;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Event Position Provider using Azure Blob Storage implemented in Azure Event Hubs SDK Version 3
+ */
+public class LegacyBlobStorageEventPositionProvider implements EventPositionProvider {
+    private static final String LEASE_SEQUENCE_NUMBER_FIELD = "sequenceNumber";
+
+    private static final Logger logger = LoggerFactory.getLogger(LegacyBlobStorageEventPositionProvider.class);
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final BlobContainerAsyncClient blobContainerAsyncClient;
+
+    private final String consumerGroup;
+
+    public LegacyBlobStorageEventPositionProvider(
+            final BlobContainerAsyncClient blobContainerAsyncClient,
+            final String consumerGroup
+    ) {
+        this.blobContainerAsyncClient = Objects.requireNonNull(blobContainerAsyncClient, "Client required");
+        this.consumerGroup = Objects.requireNonNull(consumerGroup, "Consumer Group required");
+    }
+
+    /**
+     * Get Initial Partition Event Position using Azure Blob Storage as persisted in
+     * com.microsoft.azure.eventprocessorhost.AzureStorageCheckpointLeaseManager
+     *
+     * @return Map of Partition and Event Position or empty when no checkpoints found
+     */
+    @Override
+    public Map<String, EventPosition> getInitialPartitionEventPosition() {
+        final Map<String, EventPosition> partitionEventPosition;
+
+        if (containerExists()) {
+            final BlobListDetails blobListDetails = new BlobListDetails().setRetrieveMetadata(true);
+            final ListBlobsOptions listBlobsOptions = new ListBlobsOptions().setPrefix(consumerGroup).setDetails(blobListDetails);
+            final Iterable<BlobItem> blobItems = blobContainerAsyncClient.listBlobs(listBlobsOptions).toIterable();
+            partitionEventPosition = getPartitionEventPosition(blobItems);
+        } else {
+            partitionEventPosition = Collections.emptyMap();
+        }
+
+        return partitionEventPosition;
+    }
+
+    private Map<String, EventPosition> getPartitionEventPosition(final Iterable<BlobItem> blobItems) {
+        final Map<String, EventPosition> partitionEventPosition = new LinkedHashMap<>();
+
+        for (final BlobItem blobItem : blobItems) {
+            if (Boolean.TRUE.equals(blobItem.isPrefix())) {
+                continue;
+            }
+
+            final String partitionId = getPartitionId(blobItem);
+            final EventPosition eventPosition = getEventPosition(blobItem);
+            if (eventPosition == null) {
+                logger.info("Legacy Event Position not found for Partition [{}] Blob [{}]", partitionId, blobItem.getName());
+            } else {
+                partitionEventPosition.put(partitionId, eventPosition);
+            }
+        }
+
+        return partitionEventPosition;
+    }
+
+    private String getPartitionId(final BlobItem blobItem) {
+        final String blobItemName = blobItem.getName();
+        final Path blobItemPath = Paths.get(blobItemName);
+        final Path blobItemFileName = blobItemPath.getFileName();
+        return blobItemFileName.toString();
+    }
+
+    private EventPosition getEventPosition(final BlobItem blobItem) {
+        final EventPosition eventPosition;
+
+        final String blobName = blobItem.getName();
+        final BlobAsyncClient blobAsyncClient = blobContainerAsyncClient.getBlobAsyncClient(blobName);
+
+        if (itemExists(blobAsyncClient)) {
+            final BinaryData content = blobAsyncClient.downloadContent().block();
+            if (content == null) {
+                throw new IllegalStateException(String.format("Legacy Event Position content not found [%s]", blobName));
+            }
+
+            try {
+                // Read com.microsoft.azure.eventprocessorhost.AzureBlobLease from JSON
+                final JsonNode lease = objectMapper.readTree(content.toBytes());
+                if (lease.hasNonNull(LEASE_SEQUENCE_NUMBER_FIELD)) {
+                    final JsonNode sequenceNumberField = lease.get(LEASE_SEQUENCE_NUMBER_FIELD);
+                    final long sequenceNumber = sequenceNumberField.asLong();
+                    eventPosition = EventPosition.fromSequenceNumber(sequenceNumber);
+
+                    blobAsyncClient.delete().block();

Review Comment:
   I'm afraid we cannot delete the old storage because in a clustered environment the "first" Consume processor instance (on the fastest node) reads all partition positions and then deletes all the files, but the others would need the info for their partitions too.
   I got this error:
   ```
   com.azure.storage.blob.models.BlobStorageException: Status code 404, "<U+FEFF><?xml version="1.0" encoding="utf-8"?><Error><Code>BlobNotFound</Code><Message>The specified blob does not exist.
   ```
   And the "second" consumer could not migrate the offsets for its partitions.
   
   As far as I can see, the consumers need to have the initial positions before we know which partitions belong to them so it is not possible to read/delete only specific files (each consumer its own files).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on pull request #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on PR #6319:
URL: https://github.com/apache/nifi/pull/6319#issuecomment-1260354286

   > It may be related to this issue: [Azure/azure-sdk-for-java#29927](https://github.com/Azure/azure-sdk-for-java/issues/29927) However, we use `azure-messaging-eventhub-checkpointstore-blob:1.15.1` where this issue should be fixed.
   
   Thanks for noting the HTTP 409 and 412 messages @turcsanyip, I also noticed that during testing. According to the linked issue, it sounds like that is expected as part of partition load balancing, but it sounds like others have also found it confusing. Something to track for future reference in the Azure SDK and upgrade when a new version is available that changes the behavior.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6319:
URL: https://github.com/apache/nifi/pull/6319#discussion_r975117881


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml:
##########
@@ -62,74 +62,40 @@
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-core</artifactId>
-            <version>${azure.core.version}</version>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-identity</artifactId>
-            <version>${azure.identity.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.azure</groupId>
-                    <artifactId>azure-core</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-eventhubs</artifactId>
-            <version>${azure-eventhubs.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-keyvault</artifactId>
-            <version>${azure-keyvault.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-eventhubs-eph</artifactId>
-            <version>${azure-eventhubs-eph.version}</version>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-messaging-eventhubs</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-storage</artifactId>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-cosmos</artifactId>
-            <version>${azure-cosmos.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.azure</groupId>
-                    <artifactId>azure-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-storage-file-datalake</artifactId>
-            <version>${azure-storage-file-datalake.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.azure</groupId>
-                    <artifactId>azure-core</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-storage-blob</artifactId>
-            <version>${azure-storage-blob.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.azure</groupId>
-                    <artifactId>azure-core</artifactId>
-                </exclusion>
-            </exclusions>
+        </dependency>
+        <!-- Legacy Microsoft Azure Libraries -->
+        <dependency>
+            <groupId>com.microsoft.azure</groupId>
+            <artifactId>azure-keyvault</artifactId>
+            <version>${azure-keyvault.version}</version>
+        </dependency>

Review Comment:
   `azure-keyvault` still brings `jsr305` in but you mentioned that it is no longer a transitive dependency of current libraries.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java:
##########
@@ -166,257 +152,135 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return propertyDescriptors;
     }
 
-    private ScheduledExecutorService executor;
-
     @OnScheduled
-    public final void setupClient(final ProcessContext context) throws ProcessException{
+    public final void createClient(final ProcessContext context) {
+        eventHubProducerClient = createEventHubProducerClient(context);
     }
 
     @OnStopped
-    public void tearDown() {
-        EventHubClient sender;
-        while ((sender = senderQueue.poll()) != null) {
-            sender.close();
+    public void closeClient() {
+        if (eventHubProducerClient == null) {
+            getLogger().info("Azure Event Hub Producer Client not configured");
+        } else {
+            eventHubProducerClient.close();
         }
     }
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext context) {
-        List<ValidationResult> retVal = AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
-        return retVal;
+        return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
     }
 
-
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        try {
-            populateSenderQueue(context);
-        } catch (ProcessException e) {
-            context.yield();
-            throw e;
-        }
-
         final StopWatch stopWatch = new StopWatch(true);
 
         final String partitioningKeyAttributeName = context.getProperty(PARTITIONING_KEY_ATTRIBUTE_NAME).getValue();
 
-        // Get N flow files
-        final int maxBatchSize = NumberUtils.toInt(context.getProperty(MAX_BATCH_SIZE).getValue(), 100);
-        final List<FlowFile> flowFileList = session.get(maxBatchSize);
+        final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
 
-        // Convert and send each flow file
-        final BlockingQueue<CompletableFuture<FlowFileResultCarrier<Relationship>>> futureQueue = new LinkedBlockingQueue<>();
-        for (FlowFile flowFile : flowFileList) {
-            if (flowFile == null) {
-                continue;
-            }
+        final List<FlowFileResultCarrier<Relationship>> flowFileResults = new ArrayList<>();
+        for (final FlowFile flowFile : flowFileBatch) {
+            final FlowFileResultCarrier<Relationship> flowFileResult = handleFlowFile(flowFile, partitioningKeyAttributeName, session);
+            flowFileResults.add(flowFileResult);
+        }
 
-            futureQueue.offer(handleFlowFile(flowFile, partitioningKeyAttributeName, session));
+        processFlowFileResults(context, session, stopWatch, flowFileResults);
+    }
+
+    protected EventHubProducerClient createEventHubProducerClient(final ProcessContext context) throws ProcessException {
+        final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+        final String policyName, policyKey;
+        if (useManagedIdentity) {
+            policyName = AzureEventHubUtils.MANAGED_IDENTITY_POLICY;
+            policyKey = null;
+        } else {
+            policyName = context.getProperty(ACCESS_POLICY).getValue();
+            policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
         }
+        final String namespace = context.getProperty(NAMESPACE).getValue();
+        final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
+        final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
 
-        waitForAllFutures(context, session, stopWatch, futureQueue);
+        try {
+            final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
+
+            final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint);
+            if (AzureEventHubUtils.MANAGED_IDENTITY_POLICY.equals(policyName)) {
+                final ManagedIdentityCredentialBuilder managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
+                final ManagedIdentityCredential managedIdentityCredential = managedIdentityCredentialBuilder.build();
+                eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, managedIdentityCredential);
+            } else {
+                final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
+                eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
+            }

Review Comment:
   I think the code for the Managed Identity case could be simplified:
   - no `MANAGED_IDENTITY_POLICY` constant needed at all
   - `if (useManagedIdentity)` block (line 195-201) can be deleted
   - and modify this `if`:
   ```suggestion
               if (useManagedIdentity) {
                   final ManagedIdentityCredentialBuilder managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
                   final ManagedIdentityCredential managedIdentityCredential = managedIdentityCredentialBuilder.build();
                   eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, managedIdentityCredential);
               } else {
                   final String policyName = context.getProperty(ACCESS_POLICY).getValue();
                   final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
                   final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
                   eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on pull request #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on PR #6319:
URL: https://github.com/apache/nifi/pull/6319#issuecomment-1250400132

   Reviewing...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6319:
URL: https://github.com/apache/nifi/pull/6319#discussion_r977179923


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java:
##########
@@ -307,95 +217,125 @@ public void onScheduled(final ProcessContext context) throws ProcessException, U
         final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
         final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
         final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
-        final String connectionString;
+        final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint);
 
-        if(useManagedIdentity){
-            connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespace, serviceBusEndpoint, eventHubName);
+        final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
+
+        final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue();
+        eventHubClientBuilder.consumerGroup(consumerGroup);
+
+        if (useManagedIdentity) {
+            final ManagedIdentityCredentialBuilder managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
+            final ManagedIdentityCredential managedIdentityCredential = managedIdentityCredentialBuilder.build();
+            eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, managedIdentityCredential);
         } else {
             final String policyName = context.getProperty(ACCESS_POLICY).getValue();
             final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
-            connectionString = new ConnectionStringBuilder()
-                                    .setEndpoint(new URI(String.format(FORMAT_STRING_FOR_CONECTION_BUILDER, namespace, serviceBusEndpoint)))
-                                    .setEventHubName(eventHubName)
-                                    .setSasKeyName(policyName)
-                                    .setSasKey(policyKey).toString();
+            final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
+            eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
         }
+        eventHubConsumerClient = eventHubClientBuilder.buildConsumerClient();
 
-        if(context.getProperty(ENQUEUE_TIME).isSet()) {
-            configuredEnqueueTime = Instant.parse(context.getProperty(ENQUEUE_TIME).toString());
+        final PropertyValue enqueuedTimeProperty = context.getProperty(ENQUEUE_TIME);
+        if (enqueuedTimeProperty.isSet()) {
+            initialEnqueuedTime = Instant.parse(enqueuedTimeProperty.getValue());
         } else {
-            configuredEnqueueTime = null;
+            initialEnqueuedTime = Instant.now();
         }
-        if(context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
+
+        if (context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
             receiverFetchSize = context.getProperty(RECEIVER_FETCH_SIZE).asInteger();
         } else {
             receiverFetchSize = 100;
         }
-        if(context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
+        if (context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
             receiverFetchTimeout = Duration.ofMillis(context.getProperty(RECEIVER_FETCH_TIMEOUT).asLong());
         } else {
-            receiverFetchTimeout = null;
+            receiverFetchTimeout = Duration.ofMillis(60000);
         }
-
-        executor = Executors.newScheduledThreadPool(4);
-        setupReceiver(connectionString, executor);
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final BlockingQueue<String> partitionIds = this.partitionNames;
-        final String partitionId = partitionIds.poll();
+        final String partitionId = partitionNames.poll();
         if (partitionId == null) {
             getLogger().debug("No partitions available");
             return;
         }
 
         final StopWatch stopWatch = new StopWatch(true);
         try {
+            final Iterable<PartitionEvent> events = receiveEvents(partitionId);
+            for (final PartitionEvent partitionEvent : events) {
+                final Map<String, String> attributes = getAttributes(partitionEvent);
 
-            final Iterable<EventData> receivedEvents = receiveEvents(context, partitionId);
-            if (receivedEvents == null) {
-                return;
-            }
+                FlowFile flowFile = session.create();
+                flowFile = session.putAllAttributes(flowFile, attributes);
 
-            for (final EventData eventData : receivedEvents) {
-                if (null != eventData) {
+                final EventData eventData = partitionEvent.getData();
+                final byte[] body = eventData.getBody();
+                flowFile = session.write(flowFile, outputStream -> outputStream.write(body));
 
-                    final Map<String, String> attributes = new HashMap<>();
-                    FlowFile flowFile = session.create();
-                    final EventData.SystemProperties systemProperties = eventData.getSystemProperties();
+                session.transfer(flowFile, REL_SUCCESS);
 
-                    if (null != systemProperties) {
-                        attributes.put("eventhub.enqueued.timestamp", String.valueOf(systemProperties.getEnqueuedTime()));
-                        attributes.put("eventhub.offset", systemProperties.getOffset());
-                        attributes.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber()));
-                    }
+                final String transitUri = getTransitUri(partitionId);
+                session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            }
+        } finally {
+            partitionNames.offer(partitionId);
+        }
+    }
 
-                    final Map<String,String> applicationProperties = AzureEventHubUtils.getApplicationProperties(eventData);
-                    attributes.putAll(applicationProperties);
+    /**
+     * Receive Events from specified partition is synchronized to avoid concurrent requests for the same partition
+     *
+     * @param partitionId Partition Identifier
+     * @return Iterable of Partition Events or empty when none received
+     */
+    protected synchronized Iterable<PartitionEvent> receiveEvents(final String partitionId) {
+        final EventPosition eventPosition;
 
-                    attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue());
-                    attributes.put("eventhub.partition", partitionId);
+        if (initialEventPosition == null) {
+            getLogger().debug("Receiving Events for Partition [{}] from Initial Enqueued Time [{}]", partitionId, initialEnqueuedTime);
+            initialEventPosition = EventPosition.fromEnqueuedTime(initialEnqueuedTime);
+            eventPosition = initialEventPosition;
+        } else {
+            final PartitionProperties partitionProperties = eventHubConsumerClient.getPartitionProperties(partitionId);
+            final Instant lastEnqueuedTime = partitionProperties.getLastEnqueuedTime();
+            getLogger().debug("Receiving Events for Partition [{}] from Last Enqueued Time [{}]", partitionId, lastEnqueuedTime);
+            eventPosition = EventPosition.fromEnqueuedTime(lastEnqueuedTime);

Review Comment:
   Thanks for clarifying the issue and pointing out the differences. I updated `GetAzureEventHub` to track last Event Position for each Partition using a `ConcurrentHashMap` that starts with the initial enqueued time in `onScheduled` and subsequently updates the Event Position based on the last Sequence Number observed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6319:
URL: https://github.com/apache/nifi/pull/6319#discussion_r977030508


##########
nifi-nar-bundles/nifi-azure-bundle/pom.xml:
##########
@@ -26,12 +26,9 @@
     <packaging>pom</packaging>
 
     <properties>
-        <azure-storage.version>8.6.6</azure-storage.version>
-        <azure.core.version>1.26.0</azure.core.version>
-        <azure.identity.version>1.4.5</azure.identity.version>
-        <!-- azure-identity depends on msal4j transitively, keep these versions consistent -->
-        <msal4j.version>1.11.0</msal4j.version>
-        <azure-cosmos.version>4.26.0</azure-cosmos.version>
+        <azure.sdk.bom.version>1.2.4</azure.sdk.bom.version>

Review Comment:
   Could you please bump the bom version to the latest `1.2.6`?
   I still have troubles with restarting the Consume processor (no messages processed after restart) but it seems to work with that version.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -381,313 +366,285 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
     }
 
     @Override
-    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
         if (RECORD_READER.equals(descriptor)) {
-            isRecordReaderSet = !StringUtils.isEmpty(newValue);
+            isRecordReaderSet = StringUtils.isNotEmpty(newValue);
         } else if (RECORD_WRITER.equals(descriptor)) {
-            isRecordWriterSet = !StringUtils.isEmpty(newValue);
-        }
-    }
-
-    public class EventProcessorFactory implements IEventProcessorFactory<EventProcessor> {
-        @Override
-        public EventProcessor createEventProcessor(PartitionContext context) throws Exception {
-            final EventProcessor eventProcessor = new EventProcessor();
-            return eventProcessor;
+            isRecordWriterSet = StringUtils.isNotEmpty(newValue);
         }
     }
 
-    public class EventProcessor implements IEventProcessor {
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
+        if (eventProcessorClient == null) {
+            eventProcessorClient = createClient(context);
+            eventProcessorClient.start();
 
-        @Override
-        public void onOpen(PartitionContext context) throws Exception {
-            getLogger().info("Consumer group {} opened partition {} of {}",
-                    new Object[]{context.getConsumerGroupName(), context.getPartitionId(), context.getEventHubPath()});
-        }
+            processSessionFactory = sessionFactory;

Review Comment:
   I'd suggest setting `processSessionFactory` before starting the client because the client's event handler may try to use it before the variable gets initialized (almost zero chance though).



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java:
##########
@@ -307,95 +217,125 @@ public void onScheduled(final ProcessContext context) throws ProcessException, U
         final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
         final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
         final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
-        final String connectionString;
+        final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint);
 
-        if(useManagedIdentity){
-            connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespace, serviceBusEndpoint, eventHubName);
+        final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
+
+        final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue();
+        eventHubClientBuilder.consumerGroup(consumerGroup);
+
+        if (useManagedIdentity) {
+            final ManagedIdentityCredentialBuilder managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
+            final ManagedIdentityCredential managedIdentityCredential = managedIdentityCredentialBuilder.build();
+            eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, managedIdentityCredential);
         } else {
             final String policyName = context.getProperty(ACCESS_POLICY).getValue();
             final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
-            connectionString = new ConnectionStringBuilder()
-                                    .setEndpoint(new URI(String.format(FORMAT_STRING_FOR_CONECTION_BUILDER, namespace, serviceBusEndpoint)))
-                                    .setEventHubName(eventHubName)
-                                    .setSasKeyName(policyName)
-                                    .setSasKey(policyKey).toString();
+            final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
+            eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
         }
+        eventHubConsumerClient = eventHubClientBuilder.buildConsumerClient();
 
-        if(context.getProperty(ENQUEUE_TIME).isSet()) {
-            configuredEnqueueTime = Instant.parse(context.getProperty(ENQUEUE_TIME).toString());
+        final PropertyValue enqueuedTimeProperty = context.getProperty(ENQUEUE_TIME);
+        if (enqueuedTimeProperty.isSet()) {
+            initialEnqueuedTime = Instant.parse(enqueuedTimeProperty.getValue());
         } else {
-            configuredEnqueueTime = null;
+            initialEnqueuedTime = Instant.now();
         }
-        if(context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
+
+        if (context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
             receiverFetchSize = context.getProperty(RECEIVER_FETCH_SIZE).asInteger();
         } else {
             receiverFetchSize = 100;
         }
-        if(context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
+        if (context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
             receiverFetchTimeout = Duration.ofMillis(context.getProperty(RECEIVER_FETCH_TIMEOUT).asLong());
         } else {
-            receiverFetchTimeout = null;
+            receiverFetchTimeout = Duration.ofMillis(60000);
         }
-
-        executor = Executors.newScheduledThreadPool(4);
-        setupReceiver(connectionString, executor);
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final BlockingQueue<String> partitionIds = this.partitionNames;
-        final String partitionId = partitionIds.poll();
+        final String partitionId = partitionNames.poll();
         if (partitionId == null) {
             getLogger().debug("No partitions available");
             return;
         }
 
         final StopWatch stopWatch = new StopWatch(true);
         try {
+            final Iterable<PartitionEvent> events = receiveEvents(partitionId);
+            for (final PartitionEvent partitionEvent : events) {
+                final Map<String, String> attributes = getAttributes(partitionEvent);
 
-            final Iterable<EventData> receivedEvents = receiveEvents(context, partitionId);
-            if (receivedEvents == null) {
-                return;
-            }
+                FlowFile flowFile = session.create();
+                flowFile = session.putAllAttributes(flowFile, attributes);
 
-            for (final EventData eventData : receivedEvents) {
-                if (null != eventData) {
+                final EventData eventData = partitionEvent.getData();
+                final byte[] body = eventData.getBody();
+                flowFile = session.write(flowFile, outputStream -> outputStream.write(body));
 
-                    final Map<String, String> attributes = new HashMap<>();
-                    FlowFile flowFile = session.create();
-                    final EventData.SystemProperties systemProperties = eventData.getSystemProperties();
+                session.transfer(flowFile, REL_SUCCESS);
 
-                    if (null != systemProperties) {
-                        attributes.put("eventhub.enqueued.timestamp", String.valueOf(systemProperties.getEnqueuedTime()));
-                        attributes.put("eventhub.offset", systemProperties.getOffset());
-                        attributes.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber()));
-                    }
+                final String transitUri = getTransitUri(partitionId);
+                session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            }
+        } finally {
+            partitionNames.offer(partitionId);
+        }
+    }
 
-                    final Map<String,String> applicationProperties = AzureEventHubUtils.getApplicationProperties(eventData);
-                    attributes.putAll(applicationProperties);
+    /**
+     * Receive Events from specified partition is synchronized to avoid concurrent requests for the same partition
+     *
+     * @param partitionId Partition Identifier
+     * @return Iterable of Partition Events or empty when none received
+     */
+    protected synchronized Iterable<PartitionEvent> receiveEvents(final String partitionId) {
+        final EventPosition eventPosition;
 
-                    attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue());
-                    attributes.put("eventhub.partition", partitionId);
+        if (initialEventPosition == null) {
+            getLogger().debug("Receiving Events for Partition [{}] from Initial Enqueued Time [{}]", partitionId, initialEnqueuedTime);
+            initialEventPosition = EventPosition.fromEnqueuedTime(initialEnqueuedTime);
+            eventPosition = initialEventPosition;
+        } else {
+            final PartitionProperties partitionProperties = eventHubConsumerClient.getPartitionProperties(partitionId);
+            final Instant lastEnqueuedTime = partitionProperties.getLastEnqueuedTime();
+            getLogger().debug("Receiving Events for Partition [{}] from Last Enqueued Time [{}]", partitionId, lastEnqueuedTime);
+            eventPosition = EventPosition.fromEnqueuedTime(lastEnqueuedTime);

Review Comment:
   `eventPosition` is determined from the current latest event time here but we should continue where the previous receive call left off. The current implementation skips the events published between 2 onTrigger calls.
   
   I see the root cause is that the previous version of the library provided a stateful `PartitionReceiver` object we stored in a map per partition `ConcurrentMap<String, PartitionReceiver> partitionToReceiverMap`. This receiver object stored the lastly fetched event offset (or similar data) and the next `receive()` call started with the subsequent event after that.
   
   Now in the new API version, we have a stateless `receiveFromPartition()` method so I think we need to store the last offset positions for the partitions ourselves. It can be retrieved from EventData (not sure whether `getSequenceNumber()` or getOffset() should be used). I checked the sequence number: the `Iterable` returned by `receiveFromPartition()` seems to emit the events ordered by sequence number, so the last one should be the latest we need to store.
   



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -381,313 +366,285 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
     }
 
     @Override
-    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
         if (RECORD_READER.equals(descriptor)) {
-            isRecordReaderSet = !StringUtils.isEmpty(newValue);
+            isRecordReaderSet = StringUtils.isNotEmpty(newValue);
         } else if (RECORD_WRITER.equals(descriptor)) {
-            isRecordWriterSet = !StringUtils.isEmpty(newValue);
-        }
-    }
-
-    public class EventProcessorFactory implements IEventProcessorFactory<EventProcessor> {
-        @Override
-        public EventProcessor createEventProcessor(PartitionContext context) throws Exception {
-            final EventProcessor eventProcessor = new EventProcessor();
-            return eventProcessor;
+            isRecordWriterSet = StringUtils.isNotEmpty(newValue);
         }
     }
 
-    public class EventProcessor implements IEventProcessor {
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
+        if (eventProcessorClient == null) {
+            eventProcessorClient = createClient(context);
+            eventProcessorClient.start();
 
-        @Override
-        public void onOpen(PartitionContext context) throws Exception {
-            getLogger().info("Consumer group {} opened partition {} of {}",
-                    new Object[]{context.getConsumerGroupName(), context.getPartitionId(), context.getEventHubPath()});
-        }
+            processSessionFactory = sessionFactory;
 
-        @Override
-        public void onClose(PartitionContext context, CloseReason reason) throws Exception {
-            getLogger().info("Consumer group {} closed partition {} of {}. reason={}",
-                    new Object[]{context.getConsumerGroupName(), context.getPartitionId(), context.getEventHubPath(), reason});
+            readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+            writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
         }
 
-        @Override
-        public void onEvents(PartitionContext context, Iterable<EventData> messages) throws Exception {
-            final ProcessSession session = processSessionFactory.createSession();
-
-            try {
-
-                final StopWatch stopWatch = new StopWatch(true);
-
-                if (readerFactory != null && writerFactory != null) {
-                    writeRecords(context, messages, session, stopWatch);
-                } else {
-                    writeFlowFiles(context, messages, session, stopWatch);
-                }
+        // After a EventProcessor is registered successfully, nothing has to be done at onTrigger
+        // because new sessions are created when new messages are arrived by the EventProcessor.
+        context.yield();
+    }
 
-                // Commit NiFi first.
-                // If creating an Event Hub checkpoint failed, then the same message can be retrieved again.
-                session.commitAsync(context::checkpoint);
-            } catch (Exception e) {
-                getLogger().error("Unable to fully process received message due to " + e, e);
-                // FlowFiles those are already committed will not get rollback.
-                session.rollback();
-            }
+    @OnStopped
+    public void stopClient() {
+        if (eventProcessorClient != null) {
+            eventProcessorClient.stop();
+            eventProcessorClient = null;
+            processSessionFactory = null;
+            readerFactory = null;
+            writerFactory = null;

Review Comment:
   I recommend calling the `close()` method in a try block and nulling the fields even if the `close() `fails. Otherwise `onTrigger()` can never reinitalize the client because it is not null.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java:
##########
@@ -205,98 +193,20 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext context) {
-        List<ValidationResult> retVal = AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
-        return retVal;
-    }
-
-    protected void setupReceiver(final String connectionString, final ScheduledExecutorService executor) throws ProcessException {
-        try {
-            EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1";
-            eventHubClient = EventHubClient.createFromConnectionStringSync(connectionString, executor);
-        } catch (IOException | EventHubException e) {
-            throw new ProcessException(e);
-        }
-    }
-
-    PartitionReceiver getReceiver(final ProcessContext context, final String partitionId) throws IOException, EventHubException, ExecutionException, InterruptedException {
-        PartitionReceiver existingReceiver = partitionToReceiverMap.get(partitionId);
-        if (existingReceiver != null) {
-            return existingReceiver;
-        }
-
-        // we want to avoid allowing multiple threads to create Receivers simultaneously because that could result in
-        // having multiple Receivers for the same partition. So if the map does not contain a receiver for this partition,
-        // we will enter a synchronized block and check again (because once we enter the synchronized block, we know that no
-        // other thread is creating a client). If within the synchronized block, we still do not have an entry in the map,
-        // it is up to use to create the receiver, initialize it, and then put it into the map.
-        // We do not use the putIfAbsent method in order to do a CAS operation here because we want to also initialize the
-        // receiver if and only if it is not present in the map. As a result, we need to initialize the receiver and add it
-        // to the map atomically. Hence, the synchronized block.
-        synchronized (this) {
-            existingReceiver = partitionToReceiverMap.get(partitionId);
-            if (existingReceiver != null) {
-                return existingReceiver;
-            }
-
-            final String consumerGroupName = context.getProperty(CONSUMER_GROUP).getValue();
-
-            final PartitionReceiver receiver = eventHubClient.createReceiver(
-                    consumerGroupName,
-                    partitionId,
-                    EventPosition.fromEnqueuedTime(
-                            configuredEnqueueTime == null ? Instant.now() : configuredEnqueueTime)).get();
-
-            receiver.setReceiveTimeout(receiverFetchTimeout == null ? Duration.ofMillis(60000) : receiverFetchTimeout);
-            partitionToReceiverMap.put(partitionId, receiver);
-            return receiver;
-
-        }
-    }
-
-    /**
-     * This method is here to try and isolate the Azure related code as the PartitionReceiver cannot be mocked
-     * with PowerMock due to it being final. Unfortunately it extends a base class and does not implement an interface
-     * so even if we create a MockPartitionReciver, it will not work as the two classes are orthogonal.
-     *
-     * @param context     - The processcontext for this processor
-     * @param partitionId - The partition ID to retrieve a receiver by.
-     * @return - Returns the events received from the EventBus.
-     * @throws ProcessException -- If any exception is encountered, receiving events it is wrapped in a ProcessException
-     *                          and then that exception is thrown.
-     */
-    protected Iterable<EventData> receiveEvents(final ProcessContext context, final String partitionId) throws ProcessException {
-        final PartitionReceiver receiver;
-        try {
-            receiver = getReceiver(context, partitionId);
-            return receiver.receive(receiverFetchSize).get();
-        } catch (final EventHubException | IOException | ExecutionException | InterruptedException e) {
-            throw new ProcessException(e);
-        }
+        return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
     }
 
     @OnStopped
-    public void tearDown() throws ProcessException {
-        for (final PartitionReceiver receiver : partitionToReceiverMap.values()) {
-            if (null != receiver) {
-                receiver.close();
-            }
-        }
-
-        partitionToReceiverMap.clear();
-        try {
-            if (null != eventHubClient) {
-                eventHubClient.closeSync();
-            }
-            executor.shutdown();
-        } catch (final EventHubException e) {
-            throw new ProcessException(e);
+    public void closeClient() {
+        if (eventHubConsumerClient == null) {
+            getLogger().info("Azure Event Hub Consumer Client not configured");
+        } else {
+            eventHubConsumerClient.close();
         }
     }
 
-    private ScheduledExecutorService executor;
-
     @OnScheduled
-    public void onScheduled(final ProcessContext context) throws ProcessException, URISyntaxException {
+    public void onScheduled(final ProcessContext context) {
         final BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
         for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); i++) {
             partitionNames.add(String.valueOf(i));

Review Comment:
   The new API provides `EventHubConsumerClient.getPartitionIds()` method and it could be used instead of the `Number of Event Hub Partitions` property where the user has to specify the right number (which was quite poor user experience, in my opinion).
   We can also create a follow-up ticket for it if you prefer that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on pull request #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on PR #6319:
URL: https://github.com/apache/nifi/pull/6319#issuecomment-1254506995

   Thanks for the additional feedback and recommendations @turcsanyip!
   
   I upgraded the Azure SDK BOM to 1.2.6 and also made changes based on your recommendations. `GetAzureEventHub` now tracks last Event Position per Partition using a `ConcurrentHashMap`. `ConsumeAzureEventHub` also appears to function as designed following multiple restarts when using the new version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6319:
URL: https://github.com/apache/nifi/pull/6319#discussion_r977179227


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java:
##########
@@ -205,98 +193,20 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext context) {
-        List<ValidationResult> retVal = AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
-        return retVal;
-    }
-
-    protected void setupReceiver(final String connectionString, final ScheduledExecutorService executor) throws ProcessException {
-        try {
-            EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1";
-            eventHubClient = EventHubClient.createFromConnectionStringSync(connectionString, executor);
-        } catch (IOException | EventHubException e) {
-            throw new ProcessException(e);
-        }
-    }
-
-    PartitionReceiver getReceiver(final ProcessContext context, final String partitionId) throws IOException, EventHubException, ExecutionException, InterruptedException {
-        PartitionReceiver existingReceiver = partitionToReceiverMap.get(partitionId);
-        if (existingReceiver != null) {
-            return existingReceiver;
-        }
-
-        // we want to avoid allowing multiple threads to create Receivers simultaneously because that could result in
-        // having multiple Receivers for the same partition. So if the map does not contain a receiver for this partition,
-        // we will enter a synchronized block and check again (because once we enter the synchronized block, we know that no
-        // other thread is creating a client). If within the synchronized block, we still do not have an entry in the map,
-        // it is up to use to create the receiver, initialize it, and then put it into the map.
-        // We do not use the putIfAbsent method in order to do a CAS operation here because we want to also initialize the
-        // receiver if and only if it is not present in the map. As a result, we need to initialize the receiver and add it
-        // to the map atomically. Hence, the synchronized block.
-        synchronized (this) {
-            existingReceiver = partitionToReceiverMap.get(partitionId);
-            if (existingReceiver != null) {
-                return existingReceiver;
-            }
-
-            final String consumerGroupName = context.getProperty(CONSUMER_GROUP).getValue();
-
-            final PartitionReceiver receiver = eventHubClient.createReceiver(
-                    consumerGroupName,
-                    partitionId,
-                    EventPosition.fromEnqueuedTime(
-                            configuredEnqueueTime == null ? Instant.now() : configuredEnqueueTime)).get();
-
-            receiver.setReceiveTimeout(receiverFetchTimeout == null ? Duration.ofMillis(60000) : receiverFetchTimeout);
-            partitionToReceiverMap.put(partitionId, receiver);
-            return receiver;
-
-        }
-    }
-
-    /**
-     * This method is here to try and isolate the Azure related code as the PartitionReceiver cannot be mocked
-     * with PowerMock due to it being final. Unfortunately it extends a base class and does not implement an interface
-     * so even if we create a MockPartitionReciver, it will not work as the two classes are orthogonal.
-     *
-     * @param context     - The processcontext for this processor
-     * @param partitionId - The partition ID to retrieve a receiver by.
-     * @return - Returns the events received from the EventBus.
-     * @throws ProcessException -- If any exception is encountered, receiving events it is wrapped in a ProcessException
-     *                          and then that exception is thrown.
-     */
-    protected Iterable<EventData> receiveEvents(final ProcessContext context, final String partitionId) throws ProcessException {
-        final PartitionReceiver receiver;
-        try {
-            receiver = getReceiver(context, partitionId);
-            return receiver.receive(receiverFetchSize).get();
-        } catch (final EventHubException | IOException | ExecutionException | InterruptedException e) {
-            throw new ProcessException(e);
-        }
+        return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
     }
 
     @OnStopped
-    public void tearDown() throws ProcessException {
-        for (final PartitionReceiver receiver : partitionToReceiverMap.values()) {
-            if (null != receiver) {
-                receiver.close();
-            }
-        }
-
-        partitionToReceiverMap.clear();
-        try {
-            if (null != eventHubClient) {
-                eventHubClient.closeSync();
-            }
-            executor.shutdown();
-        } catch (final EventHubException e) {
-            throw new ProcessException(e);
+    public void closeClient() {
+        if (eventHubConsumerClient == null) {
+            getLogger().info("Azure Event Hub Consumer Client not configured");
+        } else {
+            eventHubConsumerClient.close();
         }
     }
 
-    private ScheduledExecutorService executor;
-
     @OnScheduled
-    public void onScheduled(final ProcessContext context) throws ProcessException, URISyntaxException {
+    public void onScheduled(final ProcessContext context) {
         final BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
         for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); i++) {
             partitionNames.add(String.valueOf(i));

Review Comment:
   Thanks for the suggestion, this is a much better user experience. I switched the implementation to use `getPartitionIds()` and set the existing property to not required, with an updated description indicating that it is deprecated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6319:
URL: https://github.com/apache/nifi/pull/6319#discussion_r981922359


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/position/LegacyBlobStorageEventPositionProvider.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub.position;
+
+import com.azure.core.util.BinaryData;
+import com.azure.messaging.eventhubs.models.EventPosition;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobContainerAsyncClient;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobListDetails;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Event Position Provider using Azure Blob Storage implemented in Azure Event Hubs SDK Version 3
+ */
+public class LegacyBlobStorageEventPositionProvider implements EventPositionProvider {
+    private static final String LEASE_SEQUENCE_NUMBER_FIELD = "sequenceNumber";
+
+    private static final Logger logger = LoggerFactory.getLogger(LegacyBlobStorageEventPositionProvider.class);
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final BlobContainerAsyncClient blobContainerAsyncClient;
+
+    private final String consumerGroup;
+
+    public LegacyBlobStorageEventPositionProvider(
+            final BlobContainerAsyncClient blobContainerAsyncClient,
+            final String consumerGroup
+    ) {
+        this.blobContainerAsyncClient = Objects.requireNonNull(blobContainerAsyncClient, "Client required");
+        this.consumerGroup = Objects.requireNonNull(consumerGroup, "Consumer Group required");
+    }
+
+    /**
+     * Get Initial Partition Event Position using Azure Blob Storage as persisted in
+     * com.microsoft.azure.eventprocessorhost.AzureStorageCheckpointLeaseManager
+     *
+     * @return Map of Partition and Event Position or empty when no checkpoints found
+     */
+    @Override
+    public Map<String, EventPosition> getInitialPartitionEventPosition() {
+        final Map<String, EventPosition> partitionEventPosition;
+
+        if (containerExists()) {
+            final BlobListDetails blobListDetails = new BlobListDetails().setRetrieveMetadata(true);
+            final ListBlobsOptions listBlobsOptions = new ListBlobsOptions().setPrefix(consumerGroup).setDetails(blobListDetails);
+            final Iterable<BlobItem> blobItems = blobContainerAsyncClient.listBlobs(listBlobsOptions).toIterable();
+            partitionEventPosition = getPartitionEventPosition(blobItems);
+        } else {
+            partitionEventPosition = Collections.emptyMap();
+        }
+
+        return partitionEventPosition;
+    }
+
+    private Map<String, EventPosition> getPartitionEventPosition(final Iterable<BlobItem> blobItems) {
+        final Map<String, EventPosition> partitionEventPosition = new LinkedHashMap<>();
+
+        for (final BlobItem blobItem : blobItems) {
+            if (Boolean.TRUE.equals(blobItem.isPrefix())) {
+                continue;
+            }
+
+            final String partitionId = getPartitionId(blobItem);
+            final EventPosition eventPosition = getEventPosition(blobItem);
+            if (eventPosition == null) {
+                logger.info("Legacy Event Position not found for Partition [{}] Blob [{}]", partitionId, blobItem.getName());
+            } else {
+                partitionEventPosition.put(partitionId, eventPosition);
+            }
+        }
+
+        return partitionEventPosition;
+    }
+
+    private String getPartitionId(final BlobItem blobItem) {
+        final String blobItemName = blobItem.getName();
+        final Path blobItemPath = Paths.get(blobItemName);
+        final Path blobItemFileName = blobItemPath.getFileName();
+        return blobItemFileName.toString();
+    }
+
+    private EventPosition getEventPosition(final BlobItem blobItem) {
+        final EventPosition eventPosition;
+
+        final String blobName = blobItem.getName();
+        final BlobAsyncClient blobAsyncClient = blobContainerAsyncClient.getBlobAsyncClient(blobName);
+
+        if (itemExists(blobAsyncClient)) {
+            final BinaryData content = blobAsyncClient.downloadContent().block();
+            if (content == null) {
+                throw new IllegalStateException(String.format("Legacy Event Position content not found [%s]", blobName));
+            }
+
+            try {
+                // Read com.microsoft.azure.eventprocessorhost.AzureBlobLease from JSON
+                final JsonNode lease = objectMapper.readTree(content.toBytes());
+                if (lease.hasNonNull(LEASE_SEQUENCE_NUMBER_FIELD)) {
+                    final JsonNode sequenceNumberField = lease.get(LEASE_SEQUENCE_NUMBER_FIELD);
+                    final long sequenceNumber = sequenceNumberField.asLong();
+                    eventPosition = EventPosition.fromSequenceNumber(sequenceNumber);
+
+                    blobAsyncClient.delete().block();

Review Comment:
   Thanks for highlighting this issue @turcsanyip.
   
   Part of the challenge with reading the legacy offset information is that the consumer identifiers change between SDK versions. The identifiers are based on a UUID, and the new SDK generates a new UUID.
   
   The other issue is that the Processor needs a way to avoid using the legacy offset information after initial retrieval, as all subsequent handling should be based on the new checkpoint store strategy.  The most straightforward way is to delete the legacy offset information. In this scenario, the first instance of the Processor should read the legacy offset information and proceed. Other instances should begin reading from the latest offset and start tracking using the new checkpoint store strategy. With this approach, at least one consumer will retrieve events based on legacy offset information. It should not be necessary for all consumers to start from previous offsets as long as at least one consumer reads the events from the oldest offset. That might require a small adjustment to this legacy provider.
   
   This is probably something worth noting in migration guidance, but open to additional ideas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6319:
URL: https://github.com/apache/nifi/pull/6319#discussion_r975860472


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml:
##########
@@ -62,74 +62,40 @@
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-core</artifactId>
-            <version>${azure.core.version}</version>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-identity</artifactId>
-            <version>${azure.identity.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.azure</groupId>
-                    <artifactId>azure-core</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-eventhubs</artifactId>
-            <version>${azure-eventhubs.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-keyvault</artifactId>
-            <version>${azure-keyvault.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-eventhubs-eph</artifactId>
-            <version>${azure-eventhubs-eph.version}</version>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-messaging-eventhubs</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-storage</artifactId>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-cosmos</artifactId>
-            <version>${azure-cosmos.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.azure</groupId>
-                    <artifactId>azure-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-storage-file-datalake</artifactId>
-            <version>${azure-storage-file-datalake.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.azure</groupId>
-                    <artifactId>azure-core</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-storage-blob</artifactId>
-            <version>${azure-storage-blob.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.azure</groupId>
-                    <artifactId>azure-core</artifactId>
-                </exclusion>
-            </exclusions>
+        </dependency>
+        <!-- Legacy Microsoft Azure Libraries -->
+        <dependency>
+            <groupId>com.microsoft.azure</groupId>
+            <artifactId>azure-keyvault</artifactId>
+            <version>${azure-keyvault.version}</version>
+        </dependency>

Review Comment:
   Thanks for highlighting this detail @turcsanyip, you are correct that `jsr305` is still a transitive dependency of `com.microsoft.azure:azure-keyvault`.
   
   On further review of the details, version 3.0.1 and later of `jsr305` are acceptable according to Apache Software Licensing guidelines, as indicated in comments on dependencies in the root [Maven configuration](https://github.com/apache/nifi/blob/main/pom.xml#L910). The transitive version of `jsr305` is now 3.0.2, so this new declaration without the exclusion meets licensing requirements.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] asfgit closed pull request #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK
URL: https://github.com/apache/nifi/pull/6319


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6319:
URL: https://github.com/apache/nifi/pull/6319#discussion_r982431762


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/position/LegacyBlobStorageEventPositionProvider.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub.position;
+
+import com.azure.core.util.BinaryData;
+import com.azure.messaging.eventhubs.models.EventPosition;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobContainerAsyncClient;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobListDetails;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Event Position Provider using Azure Blob Storage implemented in Azure Event Hubs SDK Version 3
+ */
+public class LegacyBlobStorageEventPositionProvider implements EventPositionProvider {
+    private static final String LEASE_SEQUENCE_NUMBER_FIELD = "sequenceNumber";
+
+    private static final Logger logger = LoggerFactory.getLogger(LegacyBlobStorageEventPositionProvider.class);
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final BlobContainerAsyncClient blobContainerAsyncClient;
+
+    private final String consumerGroup;
+
+    public LegacyBlobStorageEventPositionProvider(
+            final BlobContainerAsyncClient blobContainerAsyncClient,
+            final String consumerGroup
+    ) {
+        this.blobContainerAsyncClient = Objects.requireNonNull(blobContainerAsyncClient, "Client required");
+        this.consumerGroup = Objects.requireNonNull(consumerGroup, "Consumer Group required");
+    }
+
+    /**
+     * Get Initial Partition Event Position using Azure Blob Storage as persisted in
+     * com.microsoft.azure.eventprocessorhost.AzureStorageCheckpointLeaseManager
+     *
+     * @return Map of Partition and Event Position or empty when no checkpoints found
+     */
+    @Override
+    public Map<String, EventPosition> getInitialPartitionEventPosition() {
+        final Map<String, EventPosition> partitionEventPosition;
+
+        if (containerExists()) {
+            final BlobListDetails blobListDetails = new BlobListDetails().setRetrieveMetadata(true);
+            final ListBlobsOptions listBlobsOptions = new ListBlobsOptions().setPrefix(consumerGroup).setDetails(blobListDetails);
+            final Iterable<BlobItem> blobItems = blobContainerAsyncClient.listBlobs(listBlobsOptions).toIterable();
+            partitionEventPosition = getPartitionEventPosition(blobItems);
+        } else {
+            partitionEventPosition = Collections.emptyMap();
+        }
+
+        return partitionEventPosition;
+    }
+
+    private Map<String, EventPosition> getPartitionEventPosition(final Iterable<BlobItem> blobItems) {
+        final Map<String, EventPosition> partitionEventPosition = new LinkedHashMap<>();
+
+        for (final BlobItem blobItem : blobItems) {
+            if (Boolean.TRUE.equals(blobItem.isPrefix())) {
+                continue;
+            }
+
+            final String partitionId = getPartitionId(blobItem);
+            final EventPosition eventPosition = getEventPosition(blobItem);
+            if (eventPosition == null) {
+                logger.info("Legacy Event Position not found for Partition [{}] Blob [{}]", partitionId, blobItem.getName());
+            } else {
+                partitionEventPosition.put(partitionId, eventPosition);
+            }
+        }
+
+        return partitionEventPosition;
+    }
+
+    private String getPartitionId(final BlobItem blobItem) {
+        final String blobItemName = blobItem.getName();
+        final Path blobItemPath = Paths.get(blobItemName);
+        final Path blobItemFileName = blobItemPath.getFileName();
+        return blobItemFileName.toString();
+    }
+
+    private EventPosition getEventPosition(final BlobItem blobItem) {
+        final EventPosition eventPosition;
+
+        final String blobName = blobItem.getName();
+        final BlobAsyncClient blobAsyncClient = blobContainerAsyncClient.getBlobAsyncClient(blobName);
+
+        if (itemExists(blobAsyncClient)) {
+            final BinaryData content = blobAsyncClient.downloadContent().block();
+            if (content == null) {
+                throw new IllegalStateException(String.format("Legacy Event Position content not found [%s]", blobName));
+            }
+
+            try {
+                // Read com.microsoft.azure.eventprocessorhost.AzureBlobLease from JSON
+                final JsonNode lease = objectMapper.readTree(content.toBytes());
+                if (lease.hasNonNull(LEASE_SEQUENCE_NUMBER_FIELD)) {
+                    final JsonNode sequenceNumberField = lease.get(LEASE_SEQUENCE_NUMBER_FIELD);
+                    final long sequenceNumber = sequenceNumberField.asLong();
+                    eventPosition = EventPosition.fromSequenceNumber(sequenceNumber);
+
+                    blobAsyncClient.delete().block();

Review Comment:
   Following further review and additional direct discussion, I removed the Blob deletion step from the Legacy Position Provider. Although this involves an extra step when starting the Processor, it allows multiple consumers to get their current Partition offset location. After the initial load, the consumer will use the new Blob Checkpoint store, so this seems like a reasonable migration path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on pull request #6319: NIFI-10381 Refactor Azure Event Hubs components with current SDK

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on PR #6319:
URL: https://github.com/apache/nifi/pull/6319#issuecomment-1254588054

   @exceptionfactory Thanks for the review changes! `Get` works as the previous version now and `Consume` can continue processing after restart.
   As the next step, I'm going to test some non-basic use cases like multiple partitions and multiple consumers in a cluster and also to do a thorough review of the code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org