You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "malthe (via GitHub)" <gi...@apache.org> on 2023/03/23 07:17:06 UTC

[GitHub] [nifi] malthe opened a new pull request, #7076: Add support for component state as checkpointing strategy

malthe opened a new pull request, #7076:
URL: https://github.com/apache/nifi/pull/7076

   # Summary
   
   [NIFI-11294](https://issues.apache.org/jira/browse/NIFI-11294) ConsumeAzureEventHub should default to processor state for checkpointing
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [x] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [x] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [x] Pull Request based on current revision of the `main` branch
   - [x] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [ ] JDK 11
     - [x] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [x] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on PR #7076:
URL: https://github.com/apache/nifi/pull/7076#issuecomment-1482014568

   @malthe Thanks for working on this feature, it would be a valuable addition in `ConsumeAzureEventHub`.
   
   I tried out the processor and also had a look at the code.
   
   There is an essential issue with the new component state strategy: it does not handle the partition ownership.
   Please note, not only the checkpoints but also the ownership info is need to be handled and stored by a `CheckpointStore` implementation. An example can be found in [BlobCheckpointStore](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java).
   If you run the processor on a NiFi cluster, all nodes will own and process all partitions but the partitions should be load-balanced among the consumer group members (instances of the same `ConsumeAzureEventHub` processor on the cluster nodes).
   
   Also the processor cannot be stopped cleanly but throws NullPointerExceptions in a loop after Stop.
   
   Please fix these first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on PR #7076:
URL: https://github.com/apache/nifi/pull/7076#issuecomment-1726259417

   @malthe Just a heads-up that [NIFI-11595](https://issues.apache.org/jira/browse/NIFI-11595) has been merged fixing the `replaceState()` logic I mentioned in [my previous comment](https://github.com/apache/nifi/pull/7076#discussion_r1206003566). It makes the state handling of the checkpoints simpler, though it was not a blocker.
   
   Considering that there was no activity on the PR in the past months, I would like to ask if you are still interested in this feature? Do you intend to finish the PR? I'm happy to jump in and continue the implementation based on your PR if you have other interests now.
   
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on PR #7076:
URL: https://github.com/apache/nifi/pull/7076#issuecomment-1481507814

   Will review...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "malthe (via GitHub)" <gi...@apache.org>.
malthe commented on PR #7076:
URL: https://github.com/apache/nifi/pull/7076#issuecomment-1484599021

   @turcsanyip I have now implemented proper load-balancing in a9bd297b50515918092ec01a0730ee795e917932 – and to fix the issues with stopping the processor, I have removed most of the processor state fields, shifting them into a event batch processing closure instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on PR #7076:
URL: https://github.com/apache/nifi/pull/7076#issuecomment-1499633040

   Thanks @malthe !
   Apologies, I did not have time to check it and I'm going to be AFK for the next week. Will review when I'm back.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "malthe (via GitHub)" <gi...@apache.org>.
malthe commented on code in PR #7076:
URL: https://github.com/apache/nifi/pull/7076#discussion_r1206301347


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java:
##########
@@ -117,13 +131,22 @@ public class TestConsumeAzureEventHub {
     @Mock
     RecordReader reader;
 
+    @Mock
+    CheckpointStore checkpointStore;
+
+    @Mock
+    BlobContainerAsyncClient blobContainerAsyncClient;
+
+    @Captor
+    private ArgumentCaptor<Consumer<EventBatchContext>> eventBatchProcessorCapture;
+
     private MockConsumeAzureEventHub processor;
 
     private TestRunner testRunner;
 
     @BeforeEach
     public void setupProcessor() {
-        processor = new MockConsumeAzureEventHub();
+        processor = spy(new MockConsumeAzureEventHub());

Review Comment:
   Previously, the processor wasn't tested through the `onTrigger` but through some internal methods.
   
   I changed the tests so that all testing happens through the actually exposed interface. However, this requires some additional mocking in order to implement the checkpoint strategy for example.
   
   (The reason being, we don't have the luxury of testing against a real event hub here.)
   
   But instead of spying on the processor, I suppose I could implement some alternative methods on the mock class. I don't remember if I tried that and faced some issues perhaps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "malthe (via GitHub)" <gi...@apache.org>.
malthe commented on PR #7076:
URL: https://github.com/apache/nifi/pull/7076#issuecomment-1482015798

   @exceptionfactory thanks – these issues have been fixed now.
   
   The good is that the actual code seems pretty well-functioning and reasonable well-crafted as well. However, the tests are bad/wrong and I fear I have only made them worse.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "malthe (via GitHub)" <gi...@apache.org>.
malthe commented on PR #7076:
URL: https://github.com/apache/nifi/pull/7076#issuecomment-1726349415

   @turcsanyip please do – I am currently not actively using NiFi so just following along from the sideline.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] NIFI-11294 - Add support for component state as checkpointing strategy [nifi]

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory closed pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy
URL: https://github.com/apache/nifi/pull/7076


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on code in PR #7076:
URL: https://github.com/apache/nifi/pull/7076#discussion_r1203002539


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java:
##########
@@ -117,13 +131,22 @@ public class TestConsumeAzureEventHub {
     @Mock
     RecordReader reader;
 
+    @Mock
+    CheckpointStore checkpointStore;
+
+    @Mock
+    BlobContainerAsyncClient blobContainerAsyncClient;
+
+    @Captor
+    private ArgumentCaptor<Consumer<EventBatchContext>> eventBatchProcessorCapture;
+
     private MockConsumeAzureEventHub processor;
 
     private TestRunner testRunner;
 
     @BeforeEach
     public void setupProcessor() {
-        processor = new MockConsumeAzureEventHub();
+        processor = spy(new MockConsumeAzureEventHub());

Review Comment:
   The `MockConsumeAzureEventHub` Processor already isolates certain behavior for testing, so introducing a `spy()` creates and additional level of indirection. Is there a particular reason for this approach as opposed to other alternatives?



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/utils/TestComponentStateCheckpointStore.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub.utils;
+
+import com.azure.messaging.eventhubs.models.Checkpoint;
+import com.azure.messaging.eventhubs.models.PartitionOwnership;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.processors.azure.eventhub.utils.ComponentStateCheckpointStore.State;
+import org.apache.nifi.state.MockStateMap;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.assertThrows;
+
+class TestComponentStateCheckpointStore {
+    private static final String EVENT_HUB_NAMESPACE = "NAMESPACE";
+    private static final String EVENT_HUB_NAME = "NAME";
+    private static final String CONSUMER_GROUP = "CONSUMER";
+    private static final String PARTITION_ID = "1";
+    private static final long OFFSET = 1234;
+
+    final static String IDENTIFIER = "id";
+    ComponentStateCheckpointStore checkpointStore;
+    Checkpoint checkpoint;
+    PartitionOwnership partitionOwnership;
+
+    boolean failToReplaceState = false;
+    boolean throwIOExceptionOnGetState = false;
+    boolean throwIOExceptionOnReplaceState = false;
+
+    @Test
+    public void testClaimOwnershipSuccess() {
+        var claimed = checkpointStore.claimOwnership(List.of(
+            partitionOwnership
+        )).blockFirst();
+        var listed = checkpointStore.listOwnership(
+                EVENT_HUB_NAMESPACE,
+                EVENT_HUB_NAME,
+                CONSUMER_GROUP
+        ).blockFirst();
+        assert listed.getETag().equals(claimed.getETag());

Review Comment:
   The `assert` keyword is not intended for testing. All instances should be replaces with `assertEquals()` or other applicable JUnit 5 assert methods.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -403,36 +420,76 @@ public void stopClient() {
                 getLogger().warn("Event Processor Client stop failed", e);
             }
             eventProcessorClient = null;
-            processSessionFactory = null;
-            readerFactory = null;
-            writerFactory = null;
         }
     }
 
-    protected EventProcessorClient createClient(final ProcessContext context) {
-        namespaceName = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
+    protected EventProcessorClient createClient(final ProcessContext context, final ProcessSessionFactory processSessionFactory) {
+        final String namespaceName = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
         final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
         final String consumerGroup = context.getProperty(CONSUMER_GROUP).evaluateAttributeExpressions().getValue();
+        final String identifier = UUID.randomUUID().toString();
 
-        final String containerName = defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName);
-        final String storageConnectionString = createStorageConnectionString(context);
-        final BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
-                .connectionString(storageConnectionString)
-                .containerName(containerName)
-                .buildAsyncClient();
-        final BlobCheckpointStore checkpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
+        CheckpointStore checkpointStore;
+        final Map<String, EventPosition> legacyPartitionEventPosition;
+
+        final CheckpointStrategy checkpointStrategy = CheckpointStrategy.valueOf(
+                context.getProperty(CHECKPOINT_STRATEGY).getValue()
+        );
+        if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) {
+            final String containerName = defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName);
+            final String storageConnectionString = createStorageConnectionString(context);
+            final BlobContainerAsyncClient blobContainerAsyncClient = getBlobContainerAsyncClient(containerName, storageConnectionString);
+            checkpointStore = getCheckpointStoreFromBlobContainer(blobContainerAsyncClient);
+            legacyPartitionEventPosition = getLegacyPartitionEventPosition(blobContainerAsyncClient, consumerGroup);
+        } else {
+            checkpointStore = new ComponentStateCheckpointStore(
+                    identifier,
+                    new ComponentStateCheckpointStore.State() {

Review Comment:
   Having two different anonymous inner class implementations make this hard to follow. Recommend breaking this out to a distinct class.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -486,11 +563,36 @@ protected String getTransitUri(final PartitionContext partitionContext) {
             if (readerFactory == null || writerFactory == null) {
                 writeFlowFiles(eventBatchContext, session, stopWatch);
             } else {
-                writeRecords(eventBatchContext, session, stopWatch);
+                writeRecords(eventBatchContext, session, readerFactory, writerFactory, stopWatch);
             }
 
-            // Commit ProcessSession and then update Azure Event Hubs checkpoint status
-            session.commitAsync(eventBatchContext::updateCheckpoint);
+            // As a special case, when the checkpoint strategy is component state,
+            // we reuse the current session.
+            if (checkpointStrategy == CheckpointStrategy.COMPONENT_STATE) {
+                eventBatchContext = new EventBatchContext(
+                        eventBatchContext.getPartitionContext(),
+                        eventBatchContext.getEvents(),
+                        new ComponentStateCheckpointStore(
+                                identifier,
+                                new ComponentStateCheckpointStore.State() {
+                                    public StateMap getState() throws IOException {
+                                        return session.getState(Scope.CLUSTER);
+                                    }
+
+                                    public boolean replaceState(StateMap oldValue, Map<String, String> newValue) {
+                                        return false;
+                                    }
+                                }
+                        ),

Review Comment:
   The inner anonymous class results in multiple levels of nesting that make this hard to follow. Recommend breaking it out to an explicit class.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java:
##########
@@ -205,14 +228,13 @@ public void testProcessorConfigValidityWithStorageKeySet() {
         testRunner.assertValid();
     }
 
-    @Test
-    public void testReceiveOne() {
-        setProperties();
-        testRunner.run(1, false);
+    @EnumSource(CheckpointStrategy.class)

Review Comment:
   Changing all of these test methods to run for both checkpoint strategy is unnecessary and results in unnecessary processing. Recommend creating a couple separate methods instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] NIFI-11294 - Add support for component state as checkpointing strategy [nifi]

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on PR #7076:
URL: https://github.com/apache/nifi/pull/7076#issuecomment-1799190471

   Thanks for your work on this @malthe, and thanks for planning to take this on @turcsanyip. I am closing the pull request for now given the state of several conflicts, but feel free to open a new one when it is ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "malthe (via GitHub)" <gi...@apache.org>.
malthe commented on code in PR #7076:
URL: https://github.com/apache/nifi/pull/7076#discussion_r1206303613


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/utils/TestComponentStateCheckpointStore.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub.utils;
+
+import com.azure.messaging.eventhubs.models.Checkpoint;
+import com.azure.messaging.eventhubs.models.PartitionOwnership;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.processors.azure.eventhub.utils.ComponentStateCheckpointStore.State;
+import org.apache.nifi.state.MockStateMap;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.assertThrows;
+
+class TestComponentStateCheckpointStore {
+    private static final String EVENT_HUB_NAMESPACE = "NAMESPACE";
+    private static final String EVENT_HUB_NAME = "NAME";
+    private static final String CONSUMER_GROUP = "CONSUMER";
+    private static final String PARTITION_ID = "1";
+    private static final long OFFSET = 1234;
+
+    final static String IDENTIFIER = "id";
+    ComponentStateCheckpointStore checkpointStore;
+    Checkpoint checkpoint;
+    PartitionOwnership partitionOwnership;
+
+    boolean failToReplaceState = false;
+    boolean throwIOExceptionOnGetState = false;
+    boolean throwIOExceptionOnReplaceState = false;
+
+    @Test
+    public void testClaimOwnershipSuccess() {
+        var claimed = checkpointStore.claimOwnership(List.of(
+            partitionOwnership
+        )).blockFirst();
+        var listed = checkpointStore.listOwnership(
+                EVENT_HUB_NAMESPACE,
+                EVENT_HUB_NAME,
+                CONSUMER_GROUP
+        ).blockFirst();
+        assert listed.getETag().equals(claimed.getETag());

Review Comment:
   Fixed in 4aa8240c0ed8dc52dedcd7e4463e1672d5abf34d.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on PR #7076:
URL: https://github.com/apache/nifi/pull/7076#issuecomment-1727108619

   Thanks @malthe! I'll go ahead and try to finish the implementation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on code in PR #7076:
URL: https://github.com/apache/nifi/pull/7076#discussion_r1204808994


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/CheckpointStrategy.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub.utils;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum CheckpointStrategy implements DescribedValue {
+    AZURE_BLOB_STORAGE("azure-blob-storage", "Use Azure Blob Storage to store partition checkpoints and ownership"),
+    COMPONENT_STATE("component-state", "Use component state to store partition checkpoints");

Review Comment:
   Could you please use "user friendly" labels like `Component State` on the UI?



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -226,14 +235,27 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .required(true)
             .build();
+
+    static final PropertyDescriptor CHECKPOINT_STRATEGY = new PropertyDescriptor.Builder()
+            .name("checkpoint-strategy")
+            .displayName("Checkpoint Strategy")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(true)
+            .allowableValues(CheckpointStrategy.class)
+            .defaultValue(CheckpointStrategy.AZURE_BLOB_STORAGE.getValue())
+            .description("Specifies which strategy to use for storing and retrieving partition ownership information and checkpoint details for each partition.")
+            .build();
+
     static final PropertyDescriptor STORAGE_ACCOUNT_NAME = new PropertyDescriptor.Builder()
             .name("storage-account-name")
             .displayName("Storage Account Name")
             .description("Name of the Azure Storage account to store event hub consumer group state.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .required(true)
+            .dependsOn(CHECKPOINT_STRATEGY, CheckpointStrategy.AZURE_BLOB_STORAGE)

Review Comment:
   The other `Storage *` properties below should also depend on `CheckpointStrategy.AZURE_BLOB_STORAGE`.
   `Storage Account Key` property can also be mandatory as it will be shown only when it is required.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -403,36 +420,76 @@ public void stopClient() {
                 getLogger().warn("Event Processor Client stop failed", e);
             }
             eventProcessorClient = null;
-            processSessionFactory = null;
-            readerFactory = null;
-            writerFactory = null;
         }
     }
 
-    protected EventProcessorClient createClient(final ProcessContext context) {
-        namespaceName = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
+    protected EventProcessorClient createClient(final ProcessContext context, final ProcessSessionFactory processSessionFactory) {
+        final String namespaceName = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
         final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
         final String consumerGroup = context.getProperty(CONSUMER_GROUP).evaluateAttributeExpressions().getValue();
+        final String identifier = UUID.randomUUID().toString();
 
-        final String containerName = defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName);
-        final String storageConnectionString = createStorageConnectionString(context);
-        final BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
-                .connectionString(storageConnectionString)
-                .containerName(containerName)
-                .buildAsyncClient();
-        final BlobCheckpointStore checkpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
+        CheckpointStore checkpointStore;
+        final Map<String, EventPosition> legacyPartitionEventPosition;
+
+        final CheckpointStrategy checkpointStrategy = CheckpointStrategy.valueOf(
+                context.getProperty(CHECKPOINT_STRATEGY).getValue()
+        );
+        if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) {
+            final String containerName = defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName);
+            final String storageConnectionString = createStorageConnectionString(context);
+            final BlobContainerAsyncClient blobContainerAsyncClient = getBlobContainerAsyncClient(containerName, storageConnectionString);
+            checkpointStore = getCheckpointStoreFromBlobContainer(blobContainerAsyncClient);
+            legacyPartitionEventPosition = getLegacyPartitionEventPosition(blobContainerAsyncClient, consumerGroup);
+        } else {
+            checkpointStore = new ComponentStateCheckpointStore(
+                    identifier,
+                    new ComponentStateCheckpointStore.State() {
+                        public StateMap getState() throws IOException {
+                            final ProcessSession session = processSessionFactory.createSession();
+                            return session.getState(Scope.CLUSTER);
+                        }
+
+                        public boolean replaceState(StateMap oldValue, Map<String, String> newValue) throws IOException {
+                            final ProcessSession session = processSessionFactory.createSession();
+                            if (!session.replaceState(oldValue, newValue, Scope.CLUSTER)) {

Review Comment:
   `ProcessSession.replaceState()` does not provide fully proper optimistic locking because the state can be changed by another session between `replaceState()` and `commit()`. So even if `replaceState()` returns `true`, the state change may be omitted at commit time due to a concurrent update.
   
   I suggest using `ProcessContext.replace()` instead which has the right optimistic locking semantics which is required from a checkpoint store implementation.
   
   Please note `ProcessContext.replace()` cannot initialize the state currently (see also [NIFI-11595](https://issues.apache.org/jira/browse/NIFI-11595)) so it needs to be created with `setState()` before using `replace()`. E.g. in `@OnScheduled` like this:
   ```
       @OnScheduled
       public void onScheduled(ProcessContext context) throws IOException {
           if (getNodeTypeProvider().isPrimary()) {
               final StateManager stateManager = context.getStateManager();
               final StateMap state = stateManager.getState(Scope.CLUSTER);
   
               if (!state.getStateVersion().isPresent()) {
                   stateManager.setState(new HashMap<>(), Scope.CLUSTER);
               }
           }
       }
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -92,6 +100,7 @@
         + "In clustered environment, ConsumeAzureEventHub processor instances form a consumer group and the messages are distributed among the cluster nodes "
         + "(each message is processed on one cluster node only).")
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Stateful(scopes = {Scope.CLUSTER}, description = "Used to store partitions offsets when component state is configured as the checkpointing strategy.")

Review Comment:
   ```suggestion
   @Stateful(scopes = {Scope.CLUSTER}, description = "Used to store partition ownership and offsets when component state is configured as the checkpointing strategy.")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on pull request #7076: Add support for component state as checkpointing strategy

Posted by "malthe (via GitHub)" <gi...@apache.org>.
malthe commented on PR #7076:
URL: https://github.com/apache/nifi/pull/7076#issuecomment-1480758687

   Note that the existing tests (which I have adapted, somewhat unsuccessfully) are not very good – they do not actually exercise the processor's logic end-to-end, but only the "meat". I think the whole test setup for this processor really needs to be reworked entirely.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on code in PR #7076:
URL: https://github.com/apache/nifi/pull/7076#discussion_r1203185815


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/ComponentStateCheckpointStore.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub.utils;
+
+import com.azure.messaging.eventhubs.CheckpointStore;
+import com.azure.messaging.eventhubs.models.Checkpoint;
+import com.azure.messaging.eventhubs.models.PartitionContext;
+import com.azure.messaging.eventhubs.models.PartitionOwnership;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.processor.exception.ProcessException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+public class ComponentStateCheckpointStore implements CheckpointStore {
+    public interface State {
+        StateMap getState() throws IOException;
+        boolean replaceState(StateMap oldValue, Map<String, String> newValue) throws IOException;
+    }
+
+    private final String identifier;
+    private final State state;
+
+    private static final String KEY_CHECKPOINT = "checkpoint";
+    private static final String KEY_OWNERSHIP = "ownership";
+
+    public ComponentStateCheckpointStore(String identifier, State state) {
+        this.identifier = identifier;
+        this.state = state;
+    }
+
+    public Flux<PartitionOwnership> listOwnership(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
+        return Flux.defer(
+                () -> {
+                    Map<String, String> map;
+                    try {
+                        map = state.getState().toMap();
+                    } catch (IOException e) {
+                        return Flux.error(e);
+                    }
+                    List<PartitionOwnership> ownerships = getEntries(map, KEY_OWNERSHIP, this::convertOwnership);
+                    return Flux.fromIterable(ownerships).filter(
+                            ownership ->
+                                    ownership.getFullyQualifiedNamespace().equals(fullyQualifiedNamespace)
+                                    && ownership.getEventHubName().equals(eventHubName)
+                                    && ownership.getConsumerGroup().equals(consumerGroup)
+                    );
+                }
+        );
+    }
+
+    public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requestedPartitionOwnerships) {
+        return Flux.fromIterable(requestedPartitionOwnerships).flatMap(
+                partitionOwnership -> {
+                    final StateMap oldState;
+                    try {
+                        oldState = state.getState();
+                    } catch (IOException e) {
+                        return Mono.error(e);
+                    }
+
+                    String key = makeKey(
+                            KEY_OWNERSHIP,
+                            partitionOwnership.getFullyQualifiedNamespace(),
+                            partitionOwnership.getEventHubName(),
+                            partitionOwnership.getConsumerGroup(),
+                            partitionOwnership.getPartitionId()
+                    );
+
+                    final Map<String, String> newState = new HashMap<>(oldState.toMap());
+                    long timestamp = System.currentTimeMillis();
+                    String eTag = identifier + "/" + timestamp;
+                    newState.put(key, eTag);
+                    final boolean success;
+                    try {
+                        success = state.replaceState(oldState, newState);
+                    } catch (IOException e) {
+                        return Mono.error(e);
+                    }
+
+                    if (success) {
+                        return Mono.just(partitionOwnership.setETag(eTag));
+                    }
+
+                    return Mono.empty();
+                }
+        );
+    }
+
+    public Flux<Checkpoint> listCheckpoints(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
+        return Flux.defer(() -> {
+            Map<String, String> stateMap;
+            try {
+                stateMap = state.getState().toMap();
+            } catch (IOException e) {
+                return Flux.error(e);
+            }
+
+            List<Checkpoint> checkpoints = getEntries(stateMap, KEY_CHECKPOINT, this::convertCheckpoint);
+            return Flux.fromIterable(checkpoints).filter(
+                    checkpoint ->
+                            checkpoint.getFullyQualifiedNamespace().equals(fullyQualifiedNamespace)
+                                    && checkpoint.getEventHubName().equals(eventHubName)
+                                    && checkpoint.getConsumerGroup().equals(consumerGroup)
+            );
+        });
+    }
+
+    public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
+        final StateMap oldState;
+        try {
+            oldState = state.getState();
+        } catch (IOException e) {
+            return Mono.error(e);
+        }
+
+        Map<String, String> newState = new HashMap<>(oldState.toMap());
+        Long offset = checkpoint.getOffset();
+        String key = makeKey(
+                KEY_CHECKPOINT,
+                checkpoint.getFullyQualifiedNamespace(),
+                checkpoint.getEventHubName(),
+                checkpoint.getConsumerGroup(),
+                checkpoint.getPartitionId()
+        );
+        if (offset == null) {
+            newState.remove(key);
+        } else {
+            newState.put(key, offset.toString());
+        }
+
+        try {
+            state.replaceState(oldState, newState);
+        } catch (IOException e) {
+            return Mono.error(e);
+        }
+
+        // Note that we do not commit here because there is an implicit
+        // agreement that the process session factory will provide a session
+        // that is automatically committed (this is done by the processor).
+        return Mono.empty();
+    }
+
+    private Checkpoint convertCheckpoint(PartitionContext context, String value) {
+        return new Checkpoint()
+                .setFullyQualifiedNamespace(context.getFullyQualifiedNamespace())
+                .setEventHubName(context.getEventHubName())
+                .setConsumerGroup(context.getConsumerGroup())
+                .setPartitionId(context.getPartitionId())
+                .setOffset(Long.parseLong(value));
+    }
+
+    private PartitionOwnership convertOwnership(PartitionContext context, String value) {
+        final String[] parts = value.split("/", 2);
+        if (parts.length != 2) {
+            throw new ProcessException(String.format("Invalid ownership value: %s", value));
+        }
+        return new PartitionOwnership()
+                .setFullyQualifiedNamespace(context.getFullyQualifiedNamespace())
+                .setEventHubName(context.getEventHubName())
+                .setConsumerGroup(context.getConsumerGroup())
+                .setPartitionId(context.getPartitionId())
+                .setETag(value)
+                .setOwnerId(parts[0])
+                .setLastModifiedTime(Long.parseLong(parts[1])
+                );
+    }
+
+    private <T> List<T> getEntries(final Map<String, String> map, String kind, BiFunction<PartitionContext, String, T> extract) throws ProcessException {
+        final List<T> result = new ArrayList<>();
+        for (Map.Entry<String, String> entry : map.entrySet()) {
+            final String key = entry.getKey();
+            final String[] parts = key.split("/", 5);
+            if (parts.length != 5) {
+                throw new ProcessException(
+                        String.format("Invalid %s key: %s", kind, entry.getKey())
+                );
+            }
+            if (!parts[0].equals(kind)) {
+                continue;
+            }
+            final String fullyQualifiedNamespace = parts[1];
+            final String eventHubName = parts[2];
+            final String consumerGroup = parts[3];
+            final String partitionId = parts[4];
+            PartitionContext partitionContext = new PartitionContext(
+                    fullyQualifiedNamespace,
+                    eventHubName,
+                    consumerGroup,
+                    partitionId
+            );

Review Comment:
   Recommend pulling this logic out to a separate method so the formatting approach is clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on code in PR #7076:
URL: https://github.com/apache/nifi/pull/7076#discussion_r1203185008


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/ComponentStateCheckpointStore.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub.utils;
+
+import com.azure.messaging.eventhubs.CheckpointStore;
+import com.azure.messaging.eventhubs.models.Checkpoint;
+import com.azure.messaging.eventhubs.models.PartitionContext;
+import com.azure.messaging.eventhubs.models.PartitionOwnership;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.processor.exception.ProcessException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+public class ComponentStateCheckpointStore implements CheckpointStore {
+    public interface State {
+        StateMap getState() throws IOException;
+        boolean replaceState(StateMap oldValue, Map<String, String> newValue) throws IOException;
+    }
+
+    private final String identifier;
+    private final State state;
+
+    private static final String KEY_CHECKPOINT = "checkpoint";
+    private static final String KEY_OWNERSHIP = "ownership";
+
+    public ComponentStateCheckpointStore(String identifier, State state) {
+        this.identifier = identifier;
+        this.state = state;
+    }
+
+    public Flux<PartitionOwnership> listOwnership(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
+        return Flux.defer(
+                () -> {
+                    Map<String, String> map;
+                    try {
+                        map = state.getState().toMap();
+                    } catch (IOException e) {
+                        return Flux.error(e);
+                    }
+                    List<PartitionOwnership> ownerships = getEntries(map, KEY_OWNERSHIP, this::convertOwnership);
+                    return Flux.fromIterable(ownerships).filter(
+                            ownership ->
+                                    ownership.getFullyQualifiedNamespace().equals(fullyQualifiedNamespace)
+                                    && ownership.getEventHubName().equals(eventHubName)
+                                    && ownership.getConsumerGroup().equals(consumerGroup)
+                    );
+                }
+        );
+    }
+
+    public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requestedPartitionOwnerships) {
+        return Flux.fromIterable(requestedPartitionOwnerships).flatMap(
+                partitionOwnership -> {
+                    final StateMap oldState;
+                    try {
+                        oldState = state.getState();
+                    } catch (IOException e) {
+                        return Mono.error(e);
+                    }
+
+                    String key = makeKey(
+                            KEY_OWNERSHIP,
+                            partitionOwnership.getFullyQualifiedNamespace(),
+                            partitionOwnership.getEventHubName(),
+                            partitionOwnership.getConsumerGroup(),
+                            partitionOwnership.getPartitionId()
+                    );
+
+                    final Map<String, String> newState = new HashMap<>(oldState.toMap());
+                    long timestamp = System.currentTimeMillis();
+                    String eTag = identifier + "/" + timestamp;

Review Comment:
   Should the `eTag` be created based on the state version as opposed the current timestamp? That seems like it would align better with the purpose of the tag for tracking ownership and ensuring consistent version control across nodes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "malthe (via GitHub)" <gi...@apache.org>.
malthe commented on code in PR #7076:
URL: https://github.com/apache/nifi/pull/7076#discussion_r1206368808


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/ComponentStateCheckpointStore.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub.utils;
+
+import com.azure.messaging.eventhubs.CheckpointStore;
+import com.azure.messaging.eventhubs.models.Checkpoint;
+import com.azure.messaging.eventhubs.models.PartitionContext;
+import com.azure.messaging.eventhubs.models.PartitionOwnership;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.processor.exception.ProcessException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+public class ComponentStateCheckpointStore implements CheckpointStore {
+    public interface State {
+        StateMap getState() throws IOException;
+        boolean replaceState(StateMap oldValue, Map<String, String> newValue) throws IOException;
+    }
+
+    private final String identifier;
+    private final State state;
+
+    private static final String KEY_CHECKPOINT = "checkpoint";
+    private static final String KEY_OWNERSHIP = "ownership";
+
+    public ComponentStateCheckpointStore(String identifier, State state) {
+        this.identifier = identifier;
+        this.state = state;
+    }
+
+    public Flux<PartitionOwnership> listOwnership(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
+        return Flux.defer(
+                () -> {
+                    Map<String, String> map;
+                    try {
+                        map = state.getState().toMap();
+                    } catch (IOException e) {
+                        return Flux.error(e);
+                    }
+                    List<PartitionOwnership> ownerships = getEntries(map, KEY_OWNERSHIP, this::convertOwnership);
+                    return Flux.fromIterable(ownerships).filter(
+                            ownership ->
+                                    ownership.getFullyQualifiedNamespace().equals(fullyQualifiedNamespace)
+                                    && ownership.getEventHubName().equals(eventHubName)
+                                    && ownership.getConsumerGroup().equals(consumerGroup)
+                    );
+                }
+        );
+    }
+
+    public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requestedPartitionOwnerships) {
+        return Flux.fromIterable(requestedPartitionOwnerships).flatMap(
+                partitionOwnership -> {
+                    final StateMap oldState;
+                    try {
+                        oldState = state.getState();
+                    } catch (IOException e) {
+                        return Mono.error(e);
+                    }
+
+                    String key = makeKey(
+                            KEY_OWNERSHIP,
+                            partitionOwnership.getFullyQualifiedNamespace(),
+                            partitionOwnership.getEventHubName(),
+                            partitionOwnership.getConsumerGroup(),
+                            partitionOwnership.getPartitionId()
+                    );
+
+                    final Map<String, String> newState = new HashMap<>(oldState.toMap());
+                    long timestamp = System.currentTimeMillis();
+                    String eTag = identifier + "/" + timestamp;

Review Comment:
   I'm not sure – but that logic was copied from the blob-based checkpoint store.
   
   In fact, whenever I could, I have used the exact same logic between them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on code in PR #7076:
URL: https://github.com/apache/nifi/pull/7076#discussion_r1206003566


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -403,36 +420,76 @@ public void stopClient() {
                 getLogger().warn("Event Processor Client stop failed", e);
             }
             eventProcessorClient = null;
-            processSessionFactory = null;
-            readerFactory = null;
-            writerFactory = null;
         }
     }
 
-    protected EventProcessorClient createClient(final ProcessContext context) {
-        namespaceName = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
+    protected EventProcessorClient createClient(final ProcessContext context, final ProcessSessionFactory processSessionFactory) {
+        final String namespaceName = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
         final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
         final String consumerGroup = context.getProperty(CONSUMER_GROUP).evaluateAttributeExpressions().getValue();
+        final String identifier = UUID.randomUUID().toString();
 
-        final String containerName = defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName);
-        final String storageConnectionString = createStorageConnectionString(context);
-        final BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
-                .connectionString(storageConnectionString)
-                .containerName(containerName)
-                .buildAsyncClient();
-        final BlobCheckpointStore checkpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
+        CheckpointStore checkpointStore;
+        final Map<String, EventPosition> legacyPartitionEventPosition;
+
+        final CheckpointStrategy checkpointStrategy = CheckpointStrategy.valueOf(
+                context.getProperty(CHECKPOINT_STRATEGY).getValue()
+        );
+        if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) {
+            final String containerName = defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName);
+            final String storageConnectionString = createStorageConnectionString(context);
+            final BlobContainerAsyncClient blobContainerAsyncClient = getBlobContainerAsyncClient(containerName, storageConnectionString);
+            checkpointStore = getCheckpointStoreFromBlobContainer(blobContainerAsyncClient);
+            legacyPartitionEventPosition = getLegacyPartitionEventPosition(blobContainerAsyncClient, consumerGroup);
+        } else {
+            checkpointStore = new ComponentStateCheckpointStore(
+                    identifier,
+                    new ComponentStateCheckpointStore.State() {
+                        public StateMap getState() throws IOException {
+                            final ProcessSession session = processSessionFactory.createSession();
+                            return session.getState(Scope.CLUSTER);
+                        }
+
+                        public boolean replaceState(StateMap oldValue, Map<String, String> newValue) throws IOException {
+                            final ProcessSession session = processSessionFactory.createSession();
+                            if (!session.replaceState(oldValue, newValue, Scope.CLUSTER)) {

Review Comment:
   `ProcessSession.replaceState()` does not provide fully proper optimistic locking because the state can be changed by another session between `replaceState()` and `commit()`. So even if `replaceState()` returns `true`, the state change may be omitted at commit time due to a concurrent update.
   
   I suggest using `ProcessContext.getStateManager().replace()` instead which has the right optimistic locking semantics which is required from a checkpoint store implementation.
   
   Please note `ProcessContext.getStateManager().replace()` cannot initialize the state currently (see also [NIFI-11595](https://issues.apache.org/jira/browse/NIFI-11595)) so it needs to be created with `setState()` before using `replace()`. E.g. in `@OnScheduled` like this:
   ```
       @OnScheduled
       public void onScheduled(ProcessContext context) throws IOException {
           if (getNodeTypeProvider().isPrimary()) {
               final StateManager stateManager = context.getStateManager();
               final StateMap state = stateManager.getState(Scope.CLUSTER);
   
               if (!state.getStateVersion().isPresent()) {
                   stateManager.setState(new HashMap<>(), Scope.CLUSTER);
               }
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #7076: NIFI-11294 - Add support for component state as checkpointing strategy

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on code in PR #7076:
URL: https://github.com/apache/nifi/pull/7076#discussion_r1147016376


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/CheckpointStrategy.java:
##########
@@ -0,0 +1,32 @@
+package org.apache.nifi.processors.azure.eventhub.utils;

Review Comment:
   This new class is missing a license header.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org