You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "turcsanyip (via GitHub)" <gi...@apache.org> on 2023/06/15 20:19:14 UTC

[GitHub] [nifi] turcsanyip commented on a diff in pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

turcsanyip commented on code in PR #7076:
URL: https://github.com/apache/nifi/pull/7076#discussion_r1206003566


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -403,36 +420,76 @@ public void stopClient() {
                 getLogger().warn("Event Processor Client stop failed", e);
             }
             eventProcessorClient = null;
-            processSessionFactory = null;
-            readerFactory = null;
-            writerFactory = null;
         }
     }
 
-    protected EventProcessorClient createClient(final ProcessContext context) {
-        namespaceName = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
+    protected EventProcessorClient createClient(final ProcessContext context, final ProcessSessionFactory processSessionFactory) {
+        final String namespaceName = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
         final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
         final String consumerGroup = context.getProperty(CONSUMER_GROUP).evaluateAttributeExpressions().getValue();
+        final String identifier = UUID.randomUUID().toString();
 
-        final String containerName = defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName);
-        final String storageConnectionString = createStorageConnectionString(context);
-        final BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
-                .connectionString(storageConnectionString)
-                .containerName(containerName)
-                .buildAsyncClient();
-        final BlobCheckpointStore checkpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
+        CheckpointStore checkpointStore;
+        final Map<String, EventPosition> legacyPartitionEventPosition;
+
+        final CheckpointStrategy checkpointStrategy = CheckpointStrategy.valueOf(
+                context.getProperty(CHECKPOINT_STRATEGY).getValue()
+        );
+        if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) {
+            final String containerName = defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName);
+            final String storageConnectionString = createStorageConnectionString(context);
+            final BlobContainerAsyncClient blobContainerAsyncClient = getBlobContainerAsyncClient(containerName, storageConnectionString);
+            checkpointStore = getCheckpointStoreFromBlobContainer(blobContainerAsyncClient);
+            legacyPartitionEventPosition = getLegacyPartitionEventPosition(blobContainerAsyncClient, consumerGroup);
+        } else {
+            checkpointStore = new ComponentStateCheckpointStore(
+                    identifier,
+                    new ComponentStateCheckpointStore.State() {
+                        public StateMap getState() throws IOException {
+                            final ProcessSession session = processSessionFactory.createSession();
+                            return session.getState(Scope.CLUSTER);
+                        }
+
+                        public boolean replaceState(StateMap oldValue, Map<String, String> newValue) throws IOException {
+                            final ProcessSession session = processSessionFactory.createSession();
+                            if (!session.replaceState(oldValue, newValue, Scope.CLUSTER)) {

Review Comment:
   `ProcessSession.replaceState()` does not provide fully proper optimistic locking because the state can be changed by another session between `replaceState()` and `commit()`. So even if `replaceState()` returns `true`, the state change may be omitted at commit time due to a concurrent update.
   
   I suggest using `ProcessContext.getStateManager().replace()` instead which has the right optimistic locking semantics which is required from a checkpoint store implementation.
   
   Please note `ProcessContext.getStateManager().replace()` cannot initialize the state currently (see also [NIFI-11595](https://issues.apache.org/jira/browse/NIFI-11595)) so it needs to be created with `setState()` before using `replace()`. E.g. in `@OnScheduled` like this:
   ```
       @OnScheduled
       public void onScheduled(ProcessContext context) throws IOException {
           if (getNodeTypeProvider().isPrimary()) {
               final StateManager stateManager = context.getStateManager();
               final StateMap state = stateManager.getState(Scope.CLUSTER);
   
               if (!state.getStateVersion().isPresent()) {
                   stateManager.setState(new HashMap<>(), Scope.CLUSTER);
               }
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org