You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "turcsanyip (via GitHub)" <gi...@apache.org> on 2023/05/25 21:27:41 UTC

[GitHub] [nifi] turcsanyip commented on a diff in pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

turcsanyip commented on code in PR #7076:
URL: https://github.com/apache/nifi/pull/7076#discussion_r1204808994


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/CheckpointStrategy.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub.utils;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum CheckpointStrategy implements DescribedValue {
+    AZURE_BLOB_STORAGE("azure-blob-storage", "Use Azure Blob Storage to store partition checkpoints and ownership"),
+    COMPONENT_STATE("component-state", "Use component state to store partition checkpoints");

Review Comment:
   Could you please use "user friendly" labels like `Component State` on the UI?



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -226,14 +235,27 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .required(true)
             .build();
+
+    static final PropertyDescriptor CHECKPOINT_STRATEGY = new PropertyDescriptor.Builder()
+            .name("checkpoint-strategy")
+            .displayName("Checkpoint Strategy")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(true)
+            .allowableValues(CheckpointStrategy.class)
+            .defaultValue(CheckpointStrategy.AZURE_BLOB_STORAGE.getValue())
+            .description("Specifies which strategy to use for storing and retrieving partition ownership information and checkpoint details for each partition.")
+            .build();
+
     static final PropertyDescriptor STORAGE_ACCOUNT_NAME = new PropertyDescriptor.Builder()
             .name("storage-account-name")
             .displayName("Storage Account Name")
             .description("Name of the Azure Storage account to store event hub consumer group state.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .required(true)
+            .dependsOn(CHECKPOINT_STRATEGY, CheckpointStrategy.AZURE_BLOB_STORAGE)

Review Comment:
   The other `Storage *` properties below should also depend on `CheckpointStrategy.AZURE_BLOB_STORAGE`.
   `Storage Account Key` property can also be mandatory as it will be shown only when it is required.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -403,36 +420,76 @@ public void stopClient() {
                 getLogger().warn("Event Processor Client stop failed", e);
             }
             eventProcessorClient = null;
-            processSessionFactory = null;
-            readerFactory = null;
-            writerFactory = null;
         }
     }
 
-    protected EventProcessorClient createClient(final ProcessContext context) {
-        namespaceName = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
+    protected EventProcessorClient createClient(final ProcessContext context, final ProcessSessionFactory processSessionFactory) {
+        final String namespaceName = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
         final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
         final String consumerGroup = context.getProperty(CONSUMER_GROUP).evaluateAttributeExpressions().getValue();
+        final String identifier = UUID.randomUUID().toString();
 
-        final String containerName = defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName);
-        final String storageConnectionString = createStorageConnectionString(context);
-        final BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
-                .connectionString(storageConnectionString)
-                .containerName(containerName)
-                .buildAsyncClient();
-        final BlobCheckpointStore checkpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
+        CheckpointStore checkpointStore;
+        final Map<String, EventPosition> legacyPartitionEventPosition;
+
+        final CheckpointStrategy checkpointStrategy = CheckpointStrategy.valueOf(
+                context.getProperty(CHECKPOINT_STRATEGY).getValue()
+        );
+        if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) {
+            final String containerName = defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName);
+            final String storageConnectionString = createStorageConnectionString(context);
+            final BlobContainerAsyncClient blobContainerAsyncClient = getBlobContainerAsyncClient(containerName, storageConnectionString);
+            checkpointStore = getCheckpointStoreFromBlobContainer(blobContainerAsyncClient);
+            legacyPartitionEventPosition = getLegacyPartitionEventPosition(blobContainerAsyncClient, consumerGroup);
+        } else {
+            checkpointStore = new ComponentStateCheckpointStore(
+                    identifier,
+                    new ComponentStateCheckpointStore.State() {
+                        public StateMap getState() throws IOException {
+                            final ProcessSession session = processSessionFactory.createSession();
+                            return session.getState(Scope.CLUSTER);
+                        }
+
+                        public boolean replaceState(StateMap oldValue, Map<String, String> newValue) throws IOException {
+                            final ProcessSession session = processSessionFactory.createSession();
+                            if (!session.replaceState(oldValue, newValue, Scope.CLUSTER)) {

Review Comment:
   `ProcessSession.replaceState()` does not provide fully proper optimistic locking because the state can be changed by another session between `replaceState()` and `commit()`. So even if `replaceState()` returns `true`, the state change may be omitted at commit time due to a concurrent update.
   
   I suggest using `ProcessContext.replace()` instead which has the right optimistic locking semantics which is required from a checkpoint store implementation.
   
   Please note `ProcessContext.replace()` cannot initialize the state currently (see also [NIFI-11595](https://issues.apache.org/jira/browse/NIFI-11595)) so it needs to be created with `setState()` before using `replace()`. E.g. in `@OnScheduled` like this:
   ```
       @OnScheduled
       public void onScheduled(ProcessContext context) throws IOException {
           if (getNodeTypeProvider().isPrimary()) {
               final StateManager stateManager = context.getStateManager();
               final StateMap state = stateManager.getState(Scope.CLUSTER);
   
               if (!state.getStateVersion().isPresent()) {
                   stateManager.setState(new HashMap<>(), Scope.CLUSTER);
               }
           }
       }
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -92,6 +100,7 @@
         + "In clustered environment, ConsumeAzureEventHub processor instances form a consumer group and the messages are distributed among the cluster nodes "
         + "(each message is processed on one cluster node only).")
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Stateful(scopes = {Scope.CLUSTER}, description = "Used to store partitions offsets when component state is configured as the checkpointing strategy.")

Review Comment:
   ```suggestion
   @Stateful(scopes = {Scope.CLUSTER}, description = "Used to store partition ownership and offsets when component state is configured as the checkpointing strategy.")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org