You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/12/13 22:00:38 UTC

[GitHub] [nifi] exceptionfactory opened a new pull request, #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

exceptionfactory opened a new pull request, #6779:
URL: https://github.com/apache/nifi/pull/6779

   # Summary
   
   [NIFI-10975](https://issues.apache.org/jira/browse/NIFI-10975) Adds an initial implementation of Kubernetes cluster leader election and state management, supporting deployments without the need for ZooKeeper.
   
   ## State Management Implementation
   
   The state management implementation uses Kubernetes [ConfigMaps](https://kubernetes.io/docs/concepts/configuration/configmap/) to persist cluster information for stateful components. The `KubernetesConfigMapStateProvider` uses the standard `data` property, but encodes property names using Base64 URL encoding without padding to meet [ConfigMap object](https://kubernetes.io/docs/concepts/configuration/configmap/) property naming requirements.
   
   The new State Provider can be configured in `state-management.xml` using the following element definition, which is included in a commented section of the configuration:
   
   ```
   <cluster-provider>
       <id>kubernetes-provider</id>
       <class>org.apache.nifi.kubernetes.state.provider.KubernetesConfigMapStateProvider</class>
   </cluster-provider>
   ```
   
   ## Leader Election Implementation
   
   The leader election implementation uses Kubernetes [Leases](https://kubernetes.io/docs/concepts/architecture/leases/) for distributed tracking of the current cluster leader. Kubernetes Lease names must adhere to [RFC 1123 subdomain naming requirements](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#dns-subdomain-names), requiring a mapping from NiFi application names to Lease names.
   
   The new Leader Election implementation can be configured using a new property in `nifi.properties` as follows:
   
   ```
   nifi.cluster.leader.election.implementation=KubernetesLeaderElectionManager
   ```
   
   ## Framework Changes
   
   The leader election implementation required promoting the `LeaderElectionManager` interface to `nifi-framework-api` to support NAR bundle extension loading. The `LeaderElectionManager` had two methods without runtime references, which were removed. Refactoring also involved creating a new `nifi-framework-leader-election-shared` module for abstracting tracking operations.
   
   The `nifi.properties` configuration includes a new property with a default value of `CuratorLeaderElectionManager`, which provides current cluster coordination using ZooKeeper.
   
   ## Kubernetes Client
   
   The implementation includes a new `nifi-kubernetes-client` library which provides utility components for Kubernetes Client access and service namespace determination.
   
   The `nifi-kubernetes-client` library depends on the [Fabric8 Kubernetes Client](https://github.com/fabric8io/kubernetes-client) which supports current versions of Kubernetes and provides separation of API and implementation classes.
   
   Both the State Provider and Leader Election Manager implementations attempt to resolve the Kubernetes namespace based on the standard [Service Account](https://kubernetes.io/docs/reference/access-authn-authz/service-accounts-admin/) namespace secret. In absence of a readable namespace secret, the provider returns `default` as the namespace for storing Leases and ConfigMaps.
   
   ## Additional Changes
   
   Additional changes include removing several integration test classes specific to ZooKeeper. These integration tests are less useful with current system integration tests run on a scheduled basis.
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [X] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [X] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [X] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [X] Pull Request based on current revision of the `main` branch
   - [X] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [X] Build completed using `mvn clean install -P contrib-check`
     - [X] JDK 8
     - [X] JDK 11
     - [X] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6779:
URL: https://github.com/apache/nifi/pull/6779#discussion_r1048569550


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kubernetes.state.provider;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.StatusDetails;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.dsl.Resource;
+import org.apache.nifi.components.AbstractConfigurableComponent;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.components.state.StateProviderInitializationContext;
+import org.apache.nifi.kubernetes.client.ServiceAccountNamespaceProvider;
+import org.apache.nifi.kubernetes.client.StandardKubernetesClientProvider;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * State Provider implementation based on Kubernetes ConfigMaps with Base64 encoded keys to meet Kubernetes constraints
+ */
+public class KubernetesConfigMapStateProvider extends AbstractConfigurableComponent implements StateProvider {
+    private static final Scope[] SUPPORTED_SCOPES = { Scope.CLUSTER };
+
+    private static final long UNKNOWN_VERSION = 0;
+
+    private static final Charset KEY_CHARACTER_SET = StandardCharsets.UTF_8;
+
+    private static final String CONFIG_MAP_NAME_FORMAT = "nifi-component-%s";
+
+    /** Encode ConfigMap keys using URL Encoder without padding characters for compliance with Kubernetes naming */
+    private static final Base64.Encoder encoder = Base64.getUrlEncoder().withoutPadding();
+
+    private static final Base64.Decoder decoder = Base64.getUrlDecoder();
+
+    private final AtomicBoolean enabled = new AtomicBoolean();
+
+    private KubernetesClient kubernetesClient;
+
+    private String namespace;
+
+    private String identifier;
+
+    private ComponentLog logger;
+
+    /**
+     * Get configured component identifier
+     *
+     * @return Component Identifier
+     */
+    @Override
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    /**
+     * Initialize Provider using configured properties
+     *
+     * @param context Initialization Context
+     */
+    @Override
+    public void initialize(final StateProviderInitializationContext context) {
+        this.identifier = context.getIdentifier();
+        this.logger = context.getLogger();
+        this.kubernetesClient = getKubernetesClient();
+        this.namespace = new ServiceAccountNamespaceProvider().getNamespace();
+    }
+
+    /**
+     * Shutdown Provider
+     */
+    @Override
+    public void shutdown() {
+        kubernetesClient.close();
+        logger.info("Provider shutdown");
+    }
+
+    /**
+     * Set State as ConfigMap based on Component Identifier
+     *
+     * @param state State Map
+     * @param componentId Component Identifier
+     * @throws IOException Thrown on failure to set State Map
+     */
+    @Override
+    public void setState(final Map<String, String> state, final String componentId) throws IOException {
+        try {
+            final ConfigMap configMap = createConfigMapBuilder(state, componentId).build();
+            final ConfigMap configMapCreated = kubernetesClient.configMaps().resource(configMap).createOrReplace();
+            final long version = getVersion(configMapCreated);
+            logger.debug("Set State Component ID [{}] Version [{}]", componentId, version);
+        } catch (final KubernetesClientException e) {
+            if (isNotFound(e.getCode())) {
+                logger.debug("State not found for Component ID [{}]", componentId, e);
+            } else {
+                throw new IOException(String.format("Set failed for Component ID [%s]", componentId), e);
+            }
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Set failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Get State Map for Component Identifier
+     *
+     * @param componentId Component Identifier of State to be retrieved
+     * @return State Map
+     * @throws IOException Thrown on failure to get State Map
+     */
+    @Override
+    public StateMap getState(final String componentId) throws IOException {
+        try {
+            final ConfigMap configMap = configMapResource(componentId).get();
+            final Map<String, String> data = configMap == null ? Collections.emptyMap() : getDecodedMap(configMap.getData());
+            final long version = configMap == null ? UNKNOWN_VERSION : getVersion(configMap);
+            return new StandardStateMap(data, version);
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Get failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Replace State ConfigMap with new State based on current resource version
+     *
+     * @param currentState Current State Map with version
+     * @param state New State Map
+     * @param componentId Component Identifier
+     * @return Replace operation status
+     */
+    @Override
+    public boolean replace(final StateMap currentState, final Map<String, String> state, final String componentId) throws IOException {
+        final String resourceVersion = Long.toString(currentState.getVersion());
+        final ConfigMap configMap = createConfigMapBuilder(state, componentId)
+                .editOrNewMetadata()
+                .withResourceVersion(resourceVersion)
+                .endMetadata()
+                .build();
+
+        try {
+            final ConfigMap configMapReplaced = kubernetesClient.configMaps().resource(configMap).replace();
+            final long version = getVersion(configMapReplaced);
+            logger.debug("Replaced State Component ID [{}] Version [{}]", componentId, version);
+            return true;
+        } catch (final KubernetesClientException e) {
+            if (isNotFoundOrConflict(e.getCode())) {
+                logger.debug("Replace State Failed Component ID [{}] Version [{}]", componentId, resourceVersion, e);
+                return false;
+            } else {
+                throw new IOException(String.format("Replace failed for Component ID [%s]", componentId), e);
+            }
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Replace failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Clear state information for specified Component Identifier
+     *
+     * @param componentId the id of the component for which state is being cleared
+     * @throws IOException Thrown on failure to clear state for Component Identifier
+     */
+    @Override
+    public void clear(final String componentId) throws IOException {
+        try {
+            setState(Collections.emptyMap(), componentId);
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Clear failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Remove state information for specified Component Identifier
+     *
+     * @param componentId Identifier of component removed from the configuration
+     * @throws IOException Thrown on failure to remove state for Component Identifier
+     */
+    @Override
+    public void onComponentRemoved(final String componentId) throws IOException {
+        try {
+            final List<StatusDetails> deleteStatus = configMapResource(componentId).delete();
+            logger.debug("Config Map [{}] deleted {}", componentId, deleteStatus);
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Remove failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Enable Provider
+     */
+    @Override
+    public void enable() {
+        enabled.getAndSet(true);
+    }
+
+    /**
+     * Disable Provider
+     */
+    @Override
+    public void disable() {
+        enabled.getAndSet(false);
+    }
+
+    /**
+     * Get Enabled status
+     *
+     * @return Enabled status
+     */
+    @Override
+    public boolean isEnabled() {
+        return enabled.get();
+    }
+
+    /**
+     * Get Supported Scopes returns CLUSTER
+     *
+     * @return Supported Scopes including CLUSTER
+     */
+    @Override
+    public Scope[] getSupportedScopes() {
+        return SUPPORTED_SCOPES;
+    }
+
+    /**
+     * Get Kubernetes Client using standard configuration
+     *
+     * @return Kubernetes Client
+     */
+    protected KubernetesClient getKubernetesClient() {
+        return new StandardKubernetesClientProvider().getKubernetesClient();
+    }
+
+    private Resource<ConfigMap> configMapResource(final String componentId) {
+        final String name = getConfigMapName(componentId);
+        return kubernetesClient.configMaps().inNamespace(namespace).withName(name);
+    }
+
+    private ConfigMapBuilder createConfigMapBuilder(final Map<String, String> state, final String componentId) {
+        final Map<String, String> encodedData = getEncodedMap(state);
+        final String name = getConfigMapName(componentId);
+        return new ConfigMapBuilder()
+                .withNewMetadata()
+                .withNamespace(namespace)
+                .withName(name)
+                .endMetadata()
+                .withData(encodedData);
+    }
+
+    private String getConfigMapName(final String componentId) {
+        return String.format(CONFIG_MAP_NAME_FORMAT, componentId);
+    }
+
+    private long getVersion(final ConfigMap configMap) {
+        final ObjectMeta metadata = configMap.getMetadata();
+        final String resourceVersion = metadata.getResourceVersion();
+        try {
+            return resourceVersion == null ? UNKNOWN_VERSION : Long.parseLong(resourceVersion);

Review Comment:
   I was wondering about this approach, it probably depends on the Kubernetes implementation. One option that comes to mind is creating using a hash of the Resource Version. Another option could be using a custom metadata field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markap14 commented on a diff in pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
markap14 commented on code in PR #6779:
URL: https://github.com/apache/nifi/pull/6779#discussion_r1049912399


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-leader-election/src/main/java/org/apache/nifi/kubernetes/leader/election/KubernetesLeaderElectionManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kubernetes.leader.election;
+
+import org.apache.nifi.controller.leader.election.LeaderElectionRole;
+import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
+import org.apache.nifi.controller.leader.election.TrackedLeaderElectionManager;
+import org.apache.nifi.kubernetes.client.KubernetesClientProvider;
+import org.apache.nifi.kubernetes.client.NamespaceProvider;
+import org.apache.nifi.kubernetes.client.ServiceAccountNamespaceProvider;
+import org.apache.nifi.kubernetes.client.StandardKubernetesClientProvider;
+import org.apache.nifi.kubernetes.leader.election.command.LeaderElectionCommandProvider;
+import org.apache.nifi.kubernetes.leader.election.command.StandardLeaderElectionCommandProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Kubernetes Leader Election Manager implementation using Kubernetes Lease Resources
+ */
+public class KubernetesLeaderElectionManager extends TrackedLeaderElectionManager {
+    private static final boolean INTERRUPT_ENABLED = true;
+
+    private static final int SERVICE_THREADS = 4;
+
+    private static final Logger logger = LoggerFactory.getLogger(KubernetesLeaderElectionManager.class);
+
+    private static final Map<String, String> ROLE_NAMES;
+
+    static {
+        final Map<String, String> roleNames = new LinkedHashMap<>();
+        for (final LeaderElectionRole leaderElectionRole : LeaderElectionRole.values()) {
+            roleNames.put(leaderElectionRole.getRoleName(), leaderElectionRole.getRoleId());
+        }
+        ROLE_NAMES = Collections.unmodifiableMap(roleNames);
+    }
+
+    private final ExecutorService executorService;
+
+    private final AtomicBoolean started = new AtomicBoolean();
+
+    private final Map<String, Future<?>> roleCommands = new ConcurrentHashMap<>();
+
+    private final Map<String, ParticipantRegistration> roleRegistrations = new ConcurrentHashMap<>();
+
+    private final Map<String, String> roleLeaders = new ConcurrentHashMap<>();
+
+    private final LeaderElectionCommandProvider leaderElectionCommandProvider;
+
+    /**
+     * Kubernetes Leader Election Manager default constructor
+     */
+    public KubernetesLeaderElectionManager() {
+        executorService = createExecutorService();
+        leaderElectionCommandProvider = createLeaderElectionCommandProvider();
+    }
+
+    /**
+     * Start Manager and register current roles
+     */
+    @Override
+    public void start() {
+        if (started.get()) {
+            logger.debug("Start requested when running");
+        } else {
+            started.getAndSet(true);
+            logger.debug("Started");
+
+            for (final ParticipantRegistration roleRegistration : roleRegistrations.values()) {
+                register(roleRegistration.roleName, roleRegistration.listener, roleRegistration.participantId);
+            }
+        }
+    }
+
+    /**
+     * Stop Manager and shutdown running commands
+     */
+    @Override
+    public void stop() {
+        try {
+            leaderElectionCommandProvider.close();
+        } catch (final IOException e) {
+            logger.warn("Leader Election Command Factory close failed", e);
+        }
+        executorService.shutdown();
+        started.getAndSet(false);
+        logger.debug("Stopped");
+    }
+
+    /**
+     * Register for Election or Observation based on presence of Participant ID and register for Leader when started
+     *
+     * @param roleName Role Name for registration
+     * @param listener State Change Listener for Leader Events
+     * @param participantId Participant ID or null when registering for Observation
+     */
+    @Override
+    public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
+        requireRoleName(roleName);
+        Objects.requireNonNull(listener, "Change Listener required");
+
+        final ParticipantRegistration roleRegistration = new ParticipantRegistration(roleName, participantId, listener);
+        roleRegistrations.put(roleName, roleRegistration);
+
+        final boolean participating = isParticipating(participantId);
+        if (participating) {
+            logger.debug("Registered Participation for Election Role [{}] ID [{}]", roleName, participantId);
+            if (started.get()) {
+                registerLeaderElectionCommand(roleName, listener, participantId);
+            }
+        } else {
+            logger.info("Registered Observation for Election Role [{}]", roleName);
+        }
+    }
+
+    /**
+     * Unregister for Leader Election of specified Role and cancel running command
+     *
+     * @param roleName Role Name to be removed from registration
+     */
+    @Override
+    public synchronized void unregister(final String roleName) {
+        requireRoleName(roleName);
+
+        final ParticipantRegistration roleRegistration = roleRegistrations.get(roleName);
+        if (roleRegistration == null) {
+            logger.info("Not registered for Election Role [{}]", roleName);
+        } else {
+            final Future<?> roleCommand = roleCommands.get(roleName);
+            if (roleCommand == null) {
+                logger.warn("Leader Election Command not found Role [{}] ID [{}]", roleName, roleRegistration.participantId);
+            } else {
+                roleCommand.cancel(INTERRUPT_ENABLED);
+            }
+
+            roleRegistrations.remove(roleName);
+            logger.info("Unregistered for Election Role [{}] ID [{}]", roleName, roleRegistration.participantId);
+        }
+    }
+
+    /**
+     * Determine whether current node is participating in Leader Election for specified Role
+     *
+     * @param roleName Role Name to be evaluated
+     * @return Participation status in Leader Election
+     */
+    @Override
+    public boolean isActiveParticipant(final String roleName) {
+        requireRoleName(roleName);
+        final String participantId = getParticipantId(roleName);
+        return isParticipating(participantId);
+    }
+
+    /**
+     * Get Leader Identifier for Role
+     *
+     * @param roleName Role Name for requested Leader Identifier
+     * @return Leader Identifier or null when not found
+     */
+    @Override
+    public String getLeader(final String roleName) {
+        requireRoleName(roleName);
+
+        final long pollStarted = System.nanoTime();
+        try {
+            final String leader;
+            if (started.get()) {
+                final String roleLeader = roleLeaders.get(roleName);
+                if (roleLeader == null) {
+                    logger.debug("Leader not registered: finding Leader for Role [{}]", roleName);
+                    leader = leaderElectionCommandProvider.findLeader(roleName);
+                } else {
+                    leader = roleLeader;
+                }
+            } else {
+                logger.debug("Manager not running: finding Leader for Role [{}]", roleName);
+                leader = leaderElectionCommandProvider.findLeader(roleName);
+            }
+            return leader;
+        } finally {
+            final long elapsed = System.nanoTime() - pollStarted;
+            registerPollTime(elapsed);
+        }
+    }
+
+    /**
+     * Determine whether current node is the Leader for the specified Role
+     *
+     * @param roleName Role Name to be evaluated
+     * @return Leader Status
+     */
+    @Override
+    public boolean isLeader(final String roleName) {
+        requireRoleName(roleName);
+        final boolean leader;
+
+        final String participantId = getParticipantId(roleName);
+        if (participantId == null) {
+            logger.debug("Role [{}] not participating in Leader election", roleName);
+            leader = false;
+        } else {
+            final String leaderId = getLeader(roleName);
+            leader = participantId.equals(leaderId);
+            if (leader) {
+                logger.debug("Role [{}] Participant ID [{}] is Leader", roleName, participantId);
+            } else {
+                logger.debug("Role [{}] Participant ID [{}] not Leader", roleName, leaderId);
+            }
+        }
+        return leader;
+    }
+
+    protected ExecutorService createExecutorService() {
+        return Executors.newFixedThreadPool(SERVICE_THREADS, new NamedThreadFactory());
+    }
+
+    protected LeaderElectionCommandProvider createLeaderElectionCommandProvider() {
+        final NamespaceProvider namespaceProvider = new ServiceAccountNamespaceProvider();
+        final String namespace = namespaceProvider.getNamespace();
+        final KubernetesClientProvider kubernetesClientProvider = new StandardKubernetesClientProvider();
+        return new StandardLeaderElectionCommandProvider(kubernetesClientProvider, namespace);
+    }
+
+    private void registerLeaderElectionCommand(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {

Review Comment:
   Ah ok, good call. So technically it's correct/safe. But I would generally mark this as `synchronized` as well because it has virtually zero cost, since the synchronization lock has already been obtained, but it makes it more clear that the actions can only be taken while synchronized. Or alternatively documenting via JavaDoc that it should only be called while synchronized. But I'd prefer just synchronizing the method itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on PR #6779:
URL: https://github.com/apache/nifi/pull/6779#issuecomment-1353469731

   Thanks for the testing and feedback @markap14!
   
   As it turns out, the Fabric8 Client version 6.3.0 includes a fix for relinquishing leadership described in [Issue 4547](https://github.com/fabric8io/kubernetes-client/issues/4547). I will include an update to client version 6.3.0 as part of additional updates for State Map versioning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] michael81877 commented on pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
michael81877 commented on PR #6779:
URL: https://github.com/apache/nifi/pull/6779#issuecomment-1375209455

   It looks like a `ConfigMap` is created for each component that requires state management. Do you have any concern about the size of state that may be written to any one `ConfigMap`?
   
   The [maximum size of a ConfigMap is 1MiB](https://kubernetes.io/docs/concepts/configuration/configmap/#:~:text=The%20data%20stored%20in%20a,separate%20database%20or%20file%20service.). Do you anticipate any compatibility issues where a use case would work with Zookeeper but would not work with ConfigMaps?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markap14 commented on pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
markap14 commented on PR #6779:
URL: https://github.com/apache/nifi/pull/6779#issuecomment-1355514506

   Also, just to ensure that I was using the latest, I did a `kubectl exec -it nifi-0 bash` and went into the `work` directory to check the lib. It's using `kubernetes-client-6.3.1.jar` as expected.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6779:
URL: https://github.com/apache/nifi/pull/6779#discussion_r1063741104


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java:
##########
@@ -6261,8 +6261,8 @@ private Set<String> getRoles(final NodeIdentifier nodeId) {
         final String nodeAddress = nodeId.getSocketAddress() + ":" + nodeId.getSocketPort();
 
         for (final String roleName : ClusterRoles.getAllRoles()) {
-            final String leader = leaderElectionManager.getLeader(roleName);
-            if (leader == null) {
+            final Optional<String> leader = leaderElectionManager.getLeader(roleName);

Review Comment:
   Thanks for catching that, I missed pushing that change through to that comparison. Will push a correction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markap14 commented on pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
markap14 commented on PR #6779:
URL: https://github.com/apache/nifi/pull/6779#issuecomment-1353716799

   Great, thanks @exceptionfactory!
   I also would recommend a couple minor changes to make this work more easily with Docker, as I had to make these changes to test:
   In the `nifi-docker/dockerhub/sh/start.sh` file I added two lines:
   ```
   # Set leader election and state management properties
   prop_replace 'nifi.cluster.leader.election.implementation'      "${NIFI_LEADER_ELECTION_IMPLEMENTATION:-CuratorLeaderElectionManager}"
   prop_replace 'nifi.state.management.provider.cluster'           "${NIFI_STATE_MANAGEMENT_CLUSTER_PROVIDER:-zk-provider}"
   ```
   
   And also in the `nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml` file there's no need to add the `<cluster-provider>` element commented out. It's just configuration, it doesn't get used for anything unless it's selected in `nifi.properties` so we should just include the config uncommented in `state-management.xml` so that it can be easily referenced from nifi.properties


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on PR #6779:
URL: https://github.com/apache/nifi/pull/6779#issuecomment-1356057372

   Thanks for the feedback @markap14.
   
   On further evaluation of the disconnect and reconnect behavior, I realized the `unregister` method was not removing the local leader identifier from the `roleLeaders` Map within `KubernetesLeaderElectionManager`. The corresponding command was not being removed from the `roleCommands` Map, which was preventing proper registration on cluster reconnection. I corrected this behavior and also corrected the Role ID resolution prior to calling `findLeader()`.
   
   In addition to those changes, I removed the `withReleaseOnCancel()` setting from the Leader Elector Builder. This was a more recent addition to the Kubernetes Client library implementation. The purpose of the setting is to update the Lease will a null holder identity, prompting nodes to attempt lease renewal. For the purpose of NiFi clustering, this behavior does not seem necessary, as NiFi nodes will proceed with attempting to update and obtain a lease lock. Removing the release on cancel setting avoids the error shown above while allowing standard lease lock update attempts to proceed.
   
   These changes resulted in consistent behavior with various disconnect and reconnect attempts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6779:
URL: https://github.com/apache/nifi/pull/6779#discussion_r1048565486


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-leader-election/src/main/java/org/apache/nifi/kubernetes/leader/election/KubernetesLeaderElectionManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kubernetes.leader.election;
+
+import org.apache.nifi.controller.leader.election.LeaderElectionRole;
+import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
+import org.apache.nifi.controller.leader.election.TrackedLeaderElectionManager;
+import org.apache.nifi.kubernetes.client.KubernetesClientProvider;
+import org.apache.nifi.kubernetes.client.NamespaceProvider;
+import org.apache.nifi.kubernetes.client.ServiceAccountNamespaceProvider;
+import org.apache.nifi.kubernetes.client.StandardKubernetesClientProvider;
+import org.apache.nifi.kubernetes.leader.election.command.LeaderElectionCommandProvider;
+import org.apache.nifi.kubernetes.leader.election.command.StandardLeaderElectionCommandProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Kubernetes Leader Election Manager implementation using Kubernetes Lease Resources
+ */
+public class KubernetesLeaderElectionManager extends TrackedLeaderElectionManager {
+    private static final boolean INTERRUPT_ENABLED = true;
+
+    private static final int SERVICE_THREADS = 4;
+
+    private static final Logger logger = LoggerFactory.getLogger(KubernetesLeaderElectionManager.class);
+
+    private static final Map<String, String> ROLE_NAMES;
+
+    static {
+        final Map<String, String> roleNames = new LinkedHashMap<>();
+        for (final LeaderElectionRole leaderElectionRole : LeaderElectionRole.values()) {
+            roleNames.put(leaderElectionRole.getRoleName(), leaderElectionRole.getRoleId());
+        }
+        ROLE_NAMES = Collections.unmodifiableMap(roleNames);
+    }
+
+    private final ExecutorService executorService;
+
+    private final AtomicBoolean started = new AtomicBoolean();
+
+    private final Map<String, Future<?>> roleCommands = new ConcurrentHashMap<>();
+
+    private final Map<String, ParticipantRegistration> roleRegistrations = new ConcurrentHashMap<>();
+
+    private final Map<String, String> roleLeaders = new ConcurrentHashMap<>();
+
+    private final LeaderElectionCommandProvider leaderElectionCommandProvider;
+
+    /**
+     * Kubernetes Leader Election Manager default constructor
+     */
+    public KubernetesLeaderElectionManager() {
+        executorService = createExecutorService();
+        leaderElectionCommandProvider = createLeaderElectionCommandProvider();
+    }
+
+    /**
+     * Start Manager and register current roles
+     */
+    @Override
+    public void start() {
+        if (started.get()) {
+            logger.debug("Start requested when running");
+        } else {
+            started.getAndSet(true);
+            logger.debug("Started");
+
+            for (final ParticipantRegistration roleRegistration : roleRegistrations.values()) {
+                register(roleRegistration.roleName, roleRegistration.listener, roleRegistration.participantId);
+            }
+        }
+    }
+
+    /**
+     * Stop Manager and shutdown running commands
+     */
+    @Override
+    public void stop() {
+        try {
+            leaderElectionCommandProvider.close();
+        } catch (final IOException e) {
+            logger.warn("Leader Election Command Factory close failed", e);
+        }
+        executorService.shutdown();
+        started.getAndSet(false);
+        logger.debug("Stopped");
+    }
+
+    /**
+     * Register for Election or Observation based on presence of Participant ID and register for Leader when started
+     *
+     * @param roleName Role Name for registration
+     * @param listener State Change Listener for Leader Events
+     * @param participantId Participant ID or null when registering for Observation
+     */
+    @Override
+    public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
+        requireRoleName(roleName);
+        Objects.requireNonNull(listener, "Change Listener required");
+
+        final ParticipantRegistration roleRegistration = new ParticipantRegistration(roleName, participantId, listener);
+        roleRegistrations.put(roleName, roleRegistration);
+
+        final boolean participating = isParticipating(participantId);
+        if (participating) {
+            logger.debug("Registered Participation for Election Role [{}] ID [{}]", roleName, participantId);
+            if (started.get()) {
+                registerLeaderElectionCommand(roleName, listener, participantId);
+            }
+        } else {
+            logger.info("Registered Observation for Election Role [{}]", roleName);
+        }
+    }
+
+    /**
+     * Unregister for Leader Election of specified Role and cancel running command
+     *
+     * @param roleName Role Name to be removed from registration
+     */
+    @Override
+    public synchronized void unregister(final String roleName) {
+        requireRoleName(roleName);
+
+        final ParticipantRegistration roleRegistration = roleRegistrations.get(roleName);
+        if (roleRegistration == null) {
+            logger.info("Not registered for Election Role [{}]", roleName);
+        } else {
+            final Future<?> roleCommand = roleCommands.get(roleName);
+            if (roleCommand == null) {
+                logger.warn("Leader Election Command not found Role [{}] ID [{}]", roleName, roleRegistration.participantId);
+            } else {
+                roleCommand.cancel(INTERRUPT_ENABLED);
+            }
+
+            roleRegistrations.remove(roleName);
+            logger.info("Unregistered for Election Role [{}] ID [{}]", roleName, roleRegistration.participantId);
+        }
+    }
+
+    /**
+     * Determine whether current node is participating in Leader Election for specified Role
+     *
+     * @param roleName Role Name to be evaluated
+     * @return Participation status in Leader Election
+     */
+    @Override
+    public boolean isActiveParticipant(final String roleName) {
+        requireRoleName(roleName);
+        final String participantId = getParticipantId(roleName);
+        return isParticipating(participantId);
+    }
+
+    /**
+     * Get Leader Identifier for Role
+     *
+     * @param roleName Role Name for requested Leader Identifier
+     * @return Leader Identifier or null when not found
+     */
+    @Override
+    public String getLeader(final String roleName) {
+        requireRoleName(roleName);
+
+        final long pollStarted = System.nanoTime();
+        try {
+            final String leader;
+            if (started.get()) {
+                final String roleLeader = roleLeaders.get(roleName);
+                if (roleLeader == null) {
+                    logger.debug("Leader not registered: finding Leader for Role [{}]", roleName);
+                    leader = leaderElectionCommandProvider.findLeader(roleName);
+                } else {
+                    leader = roleLeader;
+                }
+            } else {
+                logger.debug("Manager not running: finding Leader for Role [{}]", roleName);
+                leader = leaderElectionCommandProvider.findLeader(roleName);
+            }
+            return leader;
+        } finally {
+            final long elapsed = System.nanoTime() - pollStarted;
+            registerPollTime(elapsed);
+        }
+    }
+
+    /**
+     * Determine whether current node is the Leader for the specified Role
+     *
+     * @param roleName Role Name to be evaluated
+     * @return Leader Status
+     */
+    @Override
+    public boolean isLeader(final String roleName) {
+        requireRoleName(roleName);
+        final boolean leader;
+
+        final String participantId = getParticipantId(roleName);
+        if (participantId == null) {
+            logger.debug("Role [{}] not participating in Leader election", roleName);
+            leader = false;
+        } else {
+            final String leaderId = getLeader(roleName);
+            leader = participantId.equals(leaderId);
+            if (leader) {
+                logger.debug("Role [{}] Participant ID [{}] is Leader", roleName, participantId);
+            } else {
+                logger.debug("Role [{}] Participant ID [{}] not Leader", roleName, leaderId);
+            }
+        }
+        return leader;
+    }
+
+    protected ExecutorService createExecutorService() {
+        return Executors.newFixedThreadPool(SERVICE_THREADS, new NamedThreadFactory());
+    }
+
+    protected LeaderElectionCommandProvider createLeaderElectionCommandProvider() {
+        final NamespaceProvider namespaceProvider = new ServiceAccountNamespaceProvider();
+        final String namespace = namespaceProvider.getNamespace();
+        final KubernetesClientProvider kubernetesClientProvider = new StandardKubernetesClientProvider();
+        return new StandardLeaderElectionCommandProvider(kubernetesClientProvider, namespace);
+    }
+
+    private void registerLeaderElectionCommand(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {

Review Comment:
   This method is only called from `register`, which is marked as `synchronized`, do you think any additional synchronization is necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on PR #6779:
URL: https://github.com/apache/nifi/pull/6779#issuecomment-1375670809

   > It looks like a `ConfigMap` is created for each component that requires state management. Do you have any concern about the size of state (in bytes) that may be written to any one `ConfigMap`?
   > 
   > The [maximum size of a ConfigMap is 1MiB](https://kubernetes.io/docs/concepts/configuration/configmap/#:~:text=The%20data%20stored%20in%20a,separate%20database%20or%20file%20service.). Do you anticipate any compatibility issues where a use case would work with Zookeeper but would not work with ConfigMaps?
   
   Thanks for the feedback @michael81877. The standard ZooKeeper node storage is also [limited to 1 MB](https://zookeeper.apache.org/doc/r3.8.0/apidocs/zookeeper-server/org/apache/zookeeper/ZooKeeper.html#create-java.lang.String-byte:A-java.util.List-org.apache.zookeeper.CreateMode-), so the standard size limits of a Kubernetes ConfigMap align well with current behavior.
   
   With that background and similar size limitations, I do not expect any compatibility issues based on size limitations. It is possible that other behavioral nuances could surface as a result of these alternative State Management and Leader Election implementations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on PR #6779:
URL: https://github.com/apache/nifi/pull/6779#issuecomment-1373154084

   Good find on the expired Lease handling @markap14, thanks for the additional testing.
   
   The implementation of `StandardLeaderElectionCommandProvider.findLeader()` was always returning the last Lease Holder Identity, regardless of whether the Lease had expired. This broke the contract of `LeaderElectionManager.getLeader()` which callers expected to return a `null` when there is no leader for the role requested. I updated `findLeader()` to return `Optional<String>`, and check the expiration of the Lease so that `findLeader` returns `Optional.empty()` when the Lease has expired. The expiration is determined based on `renewTime` plus `leaseDurationSeconds`, following the definition of those fields in the [Kubernetes LeaseSpec](https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/lease-v1/#LeaseSpec).
   
   To make the implementation contract clearer, I also changed the return of `LeaderElectionManager.getLeader()` from `String` to `Optional<String>`. Although the method documentation noted that `null` indicates no leader is elected, changing the return to use `Optional` clarifies that detail without requiring too many adjustments. This seemed like a good time to make the change as part of promoting `LeaderElectionManager` to an extension component interface.
   
   Following those changes, shutting down all nodes, then starting up a single node that was not previously the Cluster Coordinator or Primary Node worked as expected. The single node was able to become the leader for those roles.
   
   It is worth noting that the Lease objects continue to live in the Kubernetes namespace even if all nodes have been shutdown for an extended period. The `renewTime` indicates the last update, and the corrected logic will now update existing values as needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markap14 commented on a diff in pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
markap14 commented on code in PR #6779:
URL: https://github.com/apache/nifi/pull/6779#discussion_r1048547015


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-leader-election/src/main/java/org/apache/nifi/kubernetes/leader/election/KubernetesLeaderElectionManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kubernetes.leader.election;
+
+import org.apache.nifi.controller.leader.election.LeaderElectionRole;
+import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
+import org.apache.nifi.controller.leader.election.TrackedLeaderElectionManager;
+import org.apache.nifi.kubernetes.client.KubernetesClientProvider;
+import org.apache.nifi.kubernetes.client.NamespaceProvider;
+import org.apache.nifi.kubernetes.client.ServiceAccountNamespaceProvider;
+import org.apache.nifi.kubernetes.client.StandardKubernetesClientProvider;
+import org.apache.nifi.kubernetes.leader.election.command.LeaderElectionCommandProvider;
+import org.apache.nifi.kubernetes.leader.election.command.StandardLeaderElectionCommandProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Kubernetes Leader Election Manager implementation using Kubernetes Lease Resources
+ */
+public class KubernetesLeaderElectionManager extends TrackedLeaderElectionManager {
+    private static final boolean INTERRUPT_ENABLED = true;
+
+    private static final int SERVICE_THREADS = 4;
+
+    private static final Logger logger = LoggerFactory.getLogger(KubernetesLeaderElectionManager.class);
+
+    private static final Map<String, String> ROLE_NAMES;
+
+    static {
+        final Map<String, String> roleNames = new LinkedHashMap<>();
+        for (final LeaderElectionRole leaderElectionRole : LeaderElectionRole.values()) {
+            roleNames.put(leaderElectionRole.getRoleName(), leaderElectionRole.getRoleId());
+        }
+        ROLE_NAMES = Collections.unmodifiableMap(roleNames);
+    }
+
+    private final ExecutorService executorService;
+
+    private final AtomicBoolean started = new AtomicBoolean();
+
+    private final Map<String, Future<?>> roleCommands = new ConcurrentHashMap<>();
+
+    private final Map<String, ParticipantRegistration> roleRegistrations = new ConcurrentHashMap<>();
+
+    private final Map<String, String> roleLeaders = new ConcurrentHashMap<>();
+
+    private final LeaderElectionCommandProvider leaderElectionCommandProvider;
+
+    /**
+     * Kubernetes Leader Election Manager default constructor
+     */
+    public KubernetesLeaderElectionManager() {
+        executorService = createExecutorService();
+        leaderElectionCommandProvider = createLeaderElectionCommandProvider();
+    }
+
+    /**
+     * Start Manager and register current roles
+     */
+    @Override
+    public void start() {
+        if (started.get()) {
+            logger.debug("Start requested when running");
+        } else {
+            started.getAndSet(true);
+            logger.debug("Started");
+
+            for (final ParticipantRegistration roleRegistration : roleRegistrations.values()) {
+                register(roleRegistration.roleName, roleRegistration.listener, roleRegistration.participantId);
+            }
+        }
+    }
+
+    /**
+     * Stop Manager and shutdown running commands
+     */
+    @Override
+    public void stop() {
+        try {
+            leaderElectionCommandProvider.close();
+        } catch (final IOException e) {
+            logger.warn("Leader Election Command Factory close failed", e);
+        }
+        executorService.shutdown();
+        started.getAndSet(false);
+        logger.debug("Stopped");
+    }
+
+    /**
+     * Register for Election or Observation based on presence of Participant ID and register for Leader when started
+     *
+     * @param roleName Role Name for registration
+     * @param listener State Change Listener for Leader Events
+     * @param participantId Participant ID or null when registering for Observation
+     */
+    @Override
+    public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
+        requireRoleName(roleName);
+        Objects.requireNonNull(listener, "Change Listener required");
+
+        final ParticipantRegistration roleRegistration = new ParticipantRegistration(roleName, participantId, listener);
+        roleRegistrations.put(roleName, roleRegistration);
+
+        final boolean participating = isParticipating(participantId);
+        if (participating) {
+            logger.debug("Registered Participation for Election Role [{}] ID [{}]", roleName, participantId);
+            if (started.get()) {
+                registerLeaderElectionCommand(roleName, listener, participantId);
+            }
+        } else {
+            logger.info("Registered Observation for Election Role [{}]", roleName);
+        }
+    }
+
+    /**
+     * Unregister for Leader Election of specified Role and cancel running command
+     *
+     * @param roleName Role Name to be removed from registration
+     */
+    @Override
+    public synchronized void unregister(final String roleName) {
+        requireRoleName(roleName);
+
+        final ParticipantRegistration roleRegistration = roleRegistrations.get(roleName);
+        if (roleRegistration == null) {
+            logger.info("Not registered for Election Role [{}]", roleName);
+        } else {
+            final Future<?> roleCommand = roleCommands.get(roleName);
+            if (roleCommand == null) {
+                logger.warn("Leader Election Command not found Role [{}] ID [{}]", roleName, roleRegistration.participantId);
+            } else {
+                roleCommand.cancel(INTERRUPT_ENABLED);
+            }
+
+            roleRegistrations.remove(roleName);
+            logger.info("Unregistered for Election Role [{}] ID [{}]", roleName, roleRegistration.participantId);
+        }
+    }
+
+    /**
+     * Determine whether current node is participating in Leader Election for specified Role
+     *
+     * @param roleName Role Name to be evaluated
+     * @return Participation status in Leader Election
+     */
+    @Override
+    public boolean isActiveParticipant(final String roleName) {
+        requireRoleName(roleName);
+        final String participantId = getParticipantId(roleName);
+        return isParticipating(participantId);
+    }
+
+    /**
+     * Get Leader Identifier for Role
+     *
+     * @param roleName Role Name for requested Leader Identifier
+     * @return Leader Identifier or null when not found
+     */
+    @Override
+    public String getLeader(final String roleName) {
+        requireRoleName(roleName);
+
+        final long pollStarted = System.nanoTime();
+        try {
+            final String leader;
+            if (started.get()) {
+                final String roleLeader = roleLeaders.get(roleName);
+                if (roleLeader == null) {
+                    logger.debug("Leader not registered: finding Leader for Role [{}]", roleName);
+                    leader = leaderElectionCommandProvider.findLeader(roleName);
+                } else {
+                    leader = roleLeader;
+                }
+            } else {
+                logger.debug("Manager not running: finding Leader for Role [{}]", roleName);
+                leader = leaderElectionCommandProvider.findLeader(roleName);
+            }
+            return leader;
+        } finally {
+            final long elapsed = System.nanoTime() - pollStarted;
+            registerPollTime(elapsed);
+        }
+    }
+
+    /**
+     * Determine whether current node is the Leader for the specified Role
+     *
+     * @param roleName Role Name to be evaluated
+     * @return Leader Status
+     */
+    @Override
+    public boolean isLeader(final String roleName) {
+        requireRoleName(roleName);
+        final boolean leader;
+
+        final String participantId = getParticipantId(roleName);
+        if (participantId == null) {
+            logger.debug("Role [{}] not participating in Leader election", roleName);
+            leader = false;
+        } else {
+            final String leaderId = getLeader(roleName);
+            leader = participantId.equals(leaderId);
+            if (leader) {
+                logger.debug("Role [{}] Participant ID [{}] is Leader", roleName, participantId);
+            } else {
+                logger.debug("Role [{}] Participant ID [{}] not Leader", roleName, leaderId);
+            }
+        }
+        return leader;
+    }
+
+    protected ExecutorService createExecutorService() {
+        return Executors.newFixedThreadPool(SERVICE_THREADS, new NamedThreadFactory());
+    }
+
+    protected LeaderElectionCommandProvider createLeaderElectionCommandProvider() {
+        final NamespaceProvider namespaceProvider = new ServiceAccountNamespaceProvider();
+        final String namespace = namespaceProvider.getNamespace();
+        final KubernetesClientProvider kubernetesClientProvider = new StandardKubernetesClientProvider();
+        return new StandardLeaderElectionCommandProvider(kubernetesClientProvider, namespace);
+    }
+
+    private void registerLeaderElectionCommand(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {

Review Comment:
   There's a lot of check-then-modify here. I think we need to ensure that this is thread safe and synchronize the method or add locking



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markap14 commented on a diff in pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
markap14 commented on code in PR #6779:
URL: https://github.com/apache/nifi/pull/6779#discussion_r1048571001


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kubernetes.state.provider;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.StatusDetails;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.dsl.Resource;
+import org.apache.nifi.components.AbstractConfigurableComponent;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.components.state.StateProviderInitializationContext;
+import org.apache.nifi.kubernetes.client.ServiceAccountNamespaceProvider;
+import org.apache.nifi.kubernetes.client.StandardKubernetesClientProvider;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * State Provider implementation based on Kubernetes ConfigMaps with Base64 encoded keys to meet Kubernetes constraints
+ */
+public class KubernetesConfigMapStateProvider extends AbstractConfigurableComponent implements StateProvider {
+    private static final Scope[] SUPPORTED_SCOPES = { Scope.CLUSTER };
+
+    private static final long UNKNOWN_VERSION = 0;
+
+    private static final Charset KEY_CHARACTER_SET = StandardCharsets.UTF_8;
+
+    private static final String CONFIG_MAP_NAME_FORMAT = "nifi-component-%s";
+
+    /** Encode ConfigMap keys using URL Encoder without padding characters for compliance with Kubernetes naming */
+    private static final Base64.Encoder encoder = Base64.getUrlEncoder().withoutPadding();
+
+    private static final Base64.Decoder decoder = Base64.getUrlDecoder();
+
+    private final AtomicBoolean enabled = new AtomicBoolean();
+
+    private KubernetesClient kubernetesClient;
+
+    private String namespace;
+
+    private String identifier;
+
+    private ComponentLog logger;
+
+    /**
+     * Get configured component identifier
+     *
+     * @return Component Identifier
+     */
+    @Override
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    /**
+     * Initialize Provider using configured properties
+     *
+     * @param context Initialization Context
+     */
+    @Override
+    public void initialize(final StateProviderInitializationContext context) {
+        this.identifier = context.getIdentifier();
+        this.logger = context.getLogger();
+        this.kubernetesClient = getKubernetesClient();
+        this.namespace = new ServiceAccountNamespaceProvider().getNamespace();
+    }
+
+    /**
+     * Shutdown Provider
+     */
+    @Override
+    public void shutdown() {
+        kubernetesClient.close();
+        logger.info("Provider shutdown");
+    }
+
+    /**
+     * Set State as ConfigMap based on Component Identifier
+     *
+     * @param state State Map
+     * @param componentId Component Identifier
+     * @throws IOException Thrown on failure to set State Map
+     */
+    @Override
+    public void setState(final Map<String, String> state, final String componentId) throws IOException {
+        try {
+            final ConfigMap configMap = createConfigMapBuilder(state, componentId).build();
+            final ConfigMap configMapCreated = kubernetesClient.configMaps().resource(configMap).createOrReplace();
+            final long version = getVersion(configMapCreated);
+            logger.debug("Set State Component ID [{}] Version [{}]", componentId, version);
+        } catch (final KubernetesClientException e) {
+            if (isNotFound(e.getCode())) {
+                logger.debug("State not found for Component ID [{}]", componentId, e);
+            } else {
+                throw new IOException(String.format("Set failed for Component ID [%s]", componentId), e);
+            }
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Set failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Get State Map for Component Identifier
+     *
+     * @param componentId Component Identifier of State to be retrieved
+     * @return State Map
+     * @throws IOException Thrown on failure to get State Map
+     */
+    @Override
+    public StateMap getState(final String componentId) throws IOException {
+        try {
+            final ConfigMap configMap = configMapResource(componentId).get();
+            final Map<String, String> data = configMap == null ? Collections.emptyMap() : getDecodedMap(configMap.getData());
+            final long version = configMap == null ? UNKNOWN_VERSION : getVersion(configMap);
+            return new StandardStateMap(data, version);
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Get failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Replace State ConfigMap with new State based on current resource version
+     *
+     * @param currentState Current State Map with version
+     * @param state New State Map
+     * @param componentId Component Identifier
+     * @return Replace operation status
+     */
+    @Override
+    public boolean replace(final StateMap currentState, final Map<String, String> state, final String componentId) throws IOException {
+        final String resourceVersion = Long.toString(currentState.getVersion());
+        final ConfigMap configMap = createConfigMapBuilder(state, componentId)
+                .editOrNewMetadata()
+                .withResourceVersion(resourceVersion)
+                .endMetadata()
+                .build();
+
+        try {
+            final ConfigMap configMapReplaced = kubernetesClient.configMaps().resource(configMap).replace();
+            final long version = getVersion(configMapReplaced);
+            logger.debug("Replaced State Component ID [{}] Version [{}]", componentId, version);
+            return true;
+        } catch (final KubernetesClientException e) {
+            if (isNotFoundOrConflict(e.getCode())) {
+                logger.debug("Replace State Failed Component ID [{}] Version [{}]", componentId, resourceVersion, e);
+                return false;
+            } else {
+                throw new IOException(String.format("Replace failed for Component ID [%s]", componentId), e);
+            }
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Replace failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Clear state information for specified Component Identifier
+     *
+     * @param componentId the id of the component for which state is being cleared
+     * @throws IOException Thrown on failure to clear state for Component Identifier
+     */
+    @Override
+    public void clear(final String componentId) throws IOException {
+        try {
+            setState(Collections.emptyMap(), componentId);
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Clear failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Remove state information for specified Component Identifier
+     *
+     * @param componentId Identifier of component removed from the configuration
+     * @throws IOException Thrown on failure to remove state for Component Identifier
+     */
+    @Override
+    public void onComponentRemoved(final String componentId) throws IOException {
+        try {
+            final List<StatusDetails> deleteStatus = configMapResource(componentId).delete();
+            logger.debug("Config Map [{}] deleted {}", componentId, deleteStatus);
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Remove failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Enable Provider
+     */
+    @Override
+    public void enable() {
+        enabled.getAndSet(true);
+    }
+
+    /**
+     * Disable Provider
+     */
+    @Override
+    public void disable() {
+        enabled.getAndSet(false);
+    }
+
+    /**
+     * Get Enabled status
+     *
+     * @return Enabled status
+     */
+    @Override
+    public boolean isEnabled() {
+        return enabled.get();
+    }
+
+    /**
+     * Get Supported Scopes returns CLUSTER
+     *
+     * @return Supported Scopes including CLUSTER
+     */
+    @Override
+    public Scope[] getSupportedScopes() {
+        return SUPPORTED_SCOPES;
+    }
+
+    /**
+     * Get Kubernetes Client using standard configuration
+     *
+     * @return Kubernetes Client
+     */
+    protected KubernetesClient getKubernetesClient() {
+        return new StandardKubernetesClientProvider().getKubernetesClient();
+    }
+
+    private Resource<ConfigMap> configMapResource(final String componentId) {
+        final String name = getConfigMapName(componentId);
+        return kubernetesClient.configMaps().inNamespace(namespace).withName(name);
+    }
+
+    private ConfigMapBuilder createConfigMapBuilder(final Map<String, String> state, final String componentId) {
+        final Map<String, String> encodedData = getEncodedMap(state);
+        final String name = getConfigMapName(componentId);
+        return new ConfigMapBuilder()
+                .withNewMetadata()
+                .withNamespace(namespace)
+                .withName(name)
+                .endMetadata()
+                .withData(encodedData);
+    }
+
+    private String getConfigMapName(final String componentId) {
+        return String.format(CONFIG_MAP_NAME_FORMAT, componentId);
+    }
+
+    private long getVersion(final ConfigMap configMap) {
+        final ObjectMeta metadata = configMap.getMetadata();
+        final String resourceVersion = metadata.getResourceVersion();
+        try {
+            return resourceVersion == null ? UNKNOWN_VERSION : Long.parseLong(resourceVersion);

Review Comment:
   Everywhere in the code base that I see `long getVersion()` being used, actually, is to check if the value is `-1` (indicating that the state hasn't actually been stored anywhere). So it probably actually makes sense to use an `Optional<String>` as the return type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on PR #6779:
URL: https://github.com/apache/nifi/pull/6779#issuecomment-1426010986

   @markap14 I rebased the branch to the current `main` so that all modules now reference the current 2.0.0-SNAPSHOT version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markap14 commented on a diff in pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
markap14 commented on code in PR #6779:
URL: https://github.com/apache/nifi/pull/6779#discussion_r1048567945


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kubernetes.state.provider;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.StatusDetails;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.dsl.Resource;
+import org.apache.nifi.components.AbstractConfigurableComponent;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.components.state.StateProviderInitializationContext;
+import org.apache.nifi.kubernetes.client.ServiceAccountNamespaceProvider;
+import org.apache.nifi.kubernetes.client.StandardKubernetesClientProvider;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * State Provider implementation based on Kubernetes ConfigMaps with Base64 encoded keys to meet Kubernetes constraints
+ */
+public class KubernetesConfigMapStateProvider extends AbstractConfigurableComponent implements StateProvider {
+    private static final Scope[] SUPPORTED_SCOPES = { Scope.CLUSTER };
+
+    private static final long UNKNOWN_VERSION = 0;
+
+    private static final Charset KEY_CHARACTER_SET = StandardCharsets.UTF_8;
+
+    private static final String CONFIG_MAP_NAME_FORMAT = "nifi-component-%s";
+
+    /** Encode ConfigMap keys using URL Encoder without padding characters for compliance with Kubernetes naming */
+    private static final Base64.Encoder encoder = Base64.getUrlEncoder().withoutPadding();
+
+    private static final Base64.Decoder decoder = Base64.getUrlDecoder();
+
+    private final AtomicBoolean enabled = new AtomicBoolean();
+
+    private KubernetesClient kubernetesClient;
+
+    private String namespace;
+
+    private String identifier;
+
+    private ComponentLog logger;
+
+    /**
+     * Get configured component identifier
+     *
+     * @return Component Identifier
+     */
+    @Override
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    /**
+     * Initialize Provider using configured properties
+     *
+     * @param context Initialization Context
+     */
+    @Override
+    public void initialize(final StateProviderInitializationContext context) {
+        this.identifier = context.getIdentifier();
+        this.logger = context.getLogger();
+        this.kubernetesClient = getKubernetesClient();
+        this.namespace = new ServiceAccountNamespaceProvider().getNamespace();
+    }
+
+    /**
+     * Shutdown Provider
+     */
+    @Override
+    public void shutdown() {
+        kubernetesClient.close();
+        logger.info("Provider shutdown");
+    }
+
+    /**
+     * Set State as ConfigMap based on Component Identifier
+     *
+     * @param state State Map
+     * @param componentId Component Identifier
+     * @throws IOException Thrown on failure to set State Map
+     */
+    @Override
+    public void setState(final Map<String, String> state, final String componentId) throws IOException {
+        try {
+            final ConfigMap configMap = createConfigMapBuilder(state, componentId).build();
+            final ConfigMap configMapCreated = kubernetesClient.configMaps().resource(configMap).createOrReplace();
+            final long version = getVersion(configMapCreated);
+            logger.debug("Set State Component ID [{}] Version [{}]", componentId, version);
+        } catch (final KubernetesClientException e) {
+            if (isNotFound(e.getCode())) {
+                logger.debug("State not found for Component ID [{}]", componentId, e);
+            } else {
+                throw new IOException(String.format("Set failed for Component ID [%s]", componentId), e);
+            }
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Set failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Get State Map for Component Identifier
+     *
+     * @param componentId Component Identifier of State to be retrieved
+     * @return State Map
+     * @throws IOException Thrown on failure to get State Map
+     */
+    @Override
+    public StateMap getState(final String componentId) throws IOException {
+        try {
+            final ConfigMap configMap = configMapResource(componentId).get();
+            final Map<String, String> data = configMap == null ? Collections.emptyMap() : getDecodedMap(configMap.getData());
+            final long version = configMap == null ? UNKNOWN_VERSION : getVersion(configMap);
+            return new StandardStateMap(data, version);
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Get failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Replace State ConfigMap with new State based on current resource version
+     *
+     * @param currentState Current State Map with version
+     * @param state New State Map
+     * @param componentId Component Identifier
+     * @return Replace operation status
+     */
+    @Override
+    public boolean replace(final StateMap currentState, final Map<String, String> state, final String componentId) throws IOException {
+        final String resourceVersion = Long.toString(currentState.getVersion());
+        final ConfigMap configMap = createConfigMapBuilder(state, componentId)
+                .editOrNewMetadata()
+                .withResourceVersion(resourceVersion)
+                .endMetadata()
+                .build();
+
+        try {
+            final ConfigMap configMapReplaced = kubernetesClient.configMaps().resource(configMap).replace();
+            final long version = getVersion(configMapReplaced);
+            logger.debug("Replaced State Component ID [{}] Version [{}]", componentId, version);
+            return true;
+        } catch (final KubernetesClientException e) {
+            if (isNotFoundOrConflict(e.getCode())) {
+                logger.debug("Replace State Failed Component ID [{}] Version [{}]", componentId, resourceVersion, e);
+                return false;
+            } else {
+                throw new IOException(String.format("Replace failed for Component ID [%s]", componentId), e);
+            }
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Replace failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Clear state information for specified Component Identifier
+     *
+     * @param componentId the id of the component for which state is being cleared
+     * @throws IOException Thrown on failure to clear state for Component Identifier
+     */
+    @Override
+    public void clear(final String componentId) throws IOException {
+        try {
+            setState(Collections.emptyMap(), componentId);
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Clear failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Remove state information for specified Component Identifier
+     *
+     * @param componentId Identifier of component removed from the configuration
+     * @throws IOException Thrown on failure to remove state for Component Identifier
+     */
+    @Override
+    public void onComponentRemoved(final String componentId) throws IOException {
+        try {
+            final List<StatusDetails> deleteStatus = configMapResource(componentId).delete();
+            logger.debug("Config Map [{}] deleted {}", componentId, deleteStatus);
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Remove failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Enable Provider
+     */
+    @Override
+    public void enable() {
+        enabled.getAndSet(true);
+    }
+
+    /**
+     * Disable Provider
+     */
+    @Override
+    public void disable() {
+        enabled.getAndSet(false);
+    }
+
+    /**
+     * Get Enabled status
+     *
+     * @return Enabled status
+     */
+    @Override
+    public boolean isEnabled() {
+        return enabled.get();
+    }
+
+    /**
+     * Get Supported Scopes returns CLUSTER
+     *
+     * @return Supported Scopes including CLUSTER
+     */
+    @Override
+    public Scope[] getSupportedScopes() {
+        return SUPPORTED_SCOPES;
+    }
+
+    /**
+     * Get Kubernetes Client using standard configuration
+     *
+     * @return Kubernetes Client
+     */
+    protected KubernetesClient getKubernetesClient() {
+        return new StandardKubernetesClientProvider().getKubernetesClient();
+    }
+
+    private Resource<ConfigMap> configMapResource(final String componentId) {
+        final String name = getConfigMapName(componentId);
+        return kubernetesClient.configMaps().inNamespace(namespace).withName(name);
+    }
+
+    private ConfigMapBuilder createConfigMapBuilder(final Map<String, String> state, final String componentId) {
+        final Map<String, String> encodedData = getEncodedMap(state);
+        final String name = getConfigMapName(componentId);
+        return new ConfigMapBuilder()
+                .withNewMetadata()
+                .withNamespace(namespace)
+                .withName(name)
+                .endMetadata()
+                .withData(encodedData);
+    }
+
+    private String getConfigMapName(final String componentId) {
+        return String.format(CONFIG_MAP_NAME_FORMAT, componentId);
+    }
+
+    private long getVersion(final ConfigMap configMap) {
+        final ObjectMeta metadata = configMap.getMetadata();
+        final String resourceVersion = metadata.getResourceVersion();
+        try {
+            return resourceVersion == null ? UNKNOWN_VERSION : Long.parseLong(resourceVersion);

Review Comment:
   I think I would propose that we update our StateMap as follows:
   Make `long getVersion()` deprecated in favor of a new `String getStateVersion()`
   Remove `long getVersion()` in version 2.0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markap14 commented on pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
markap14 commented on PR #6779:
URL: https://github.com/apache/nifi/pull/6779#issuecomment-1355512778

   Thanks @exceptionfactory . Pulled the latest, rebuilt everything, rebuilt the docker image. Interestingly, it no longer worked. I got errors on startup telling me that the URI was invalid when calling `leaderElectionCommandProvider.findLeader(roleName);` (line 197 of KubernetesLeaderElectionManager) because the URI ended in "/Cluster Coordinator".
   So I updated that code to get the `roleId` for the `roleName` and provide that.
   That addressed the issue. It was now registering for the role properly.
   Unfortunately, though, I still ran into the exact same issue. When I disconnected and reconnected a node, it gave the same stack trace, failing to release the lease because it was modified. This then caused the same issue, with both nodes thinking they are the leader.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markap14 commented on pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
markap14 commented on PR #6779:
URL: https://github.com/apache/nifi/pull/6779#issuecomment-1353410957

   So I crated a two-node nifi cluster using GKE to test this. On startup, things work well. Both nodes join the cluster. I can see that state is getting stored/recovered properly using ListGCSBucket. If I then disconnect the node that is Primary/Coordinator, I see that the other node is elected. But if I then reconnect the disconnected node, it gets into a bad state.
   Running `bin/nifi.sh diagnostics diag1.txt` on both nodes shows that both nodes actually believe that they are both the Cluster Coordinator AND the Primary Node.
   Looking at the logs of the disconnected node, I see:
   ```
   2022-12-15 16:50:42,065 ERROR [KubernetesLeaderElectionManager] i.f.k.c.e.leaderelection.LeaderElector Exception occurred while releasing lock 'LeaseLock: nifi - cluster-coordinator (10.31.1.4:4423)'
   io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LockException: Unable to update LeaseLock
           at io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock.update(LeaseLock.java:102)
           at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture.cancel(Unknown Source)
           at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$null$0(LeaderElector.java:92)
           at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture.cancel(Unknown Source)
           at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.run(LeaderElector.java:70)
           at org.apache.nifi.kubernetes.leader.election.command.LeaderElectionCommand.run(LeaderElectionCommand.java:78)
           at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
           at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
           at io.fabric8.kubernetes.client.KubernetesClientException.copyAsCause(KubernetesClientException.java:238)
           at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:517)
           at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:551)
           at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleUpdate(OperationSupport.java:347)
           at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleUpdate(BaseOperation.java:680)
           at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.lambda$replace$0(HasMetadataOperation.java:167)
           at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.replace(HasMetadataOperation.java:172)
           at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.replace(HasMetadataOperation.java:113)
           at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.replace(HasMetadataOperation.java:41)
           at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.replace(BaseOperation.java:1043)
           at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.replace(BaseOperation.java:88)
           at io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock.update(LeaseLock.java:100)
           ... 19 common frames omitted
           at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:709)
           at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:689)
           at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
           at io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl$4.onResponse(OkHttpClientImpl.java:277)
           at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
           ... 3 common frames omitted
   2022-12-15 16:50:42,066 ERROR [KubernetesLeaderElectionManager] i.f.k.c.e.leaderelection.LeaderElector Exception occurred while releasing lock 'LeaseLock: nifi - primary-node (10.31.1.4:4423)'
   io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LockException: Unable to update LeaseLock
           at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.stopLeading(LeaderElector.java:120)
           at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$null$1(LeaderElector.java:94)
           at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture.cancel(Unknown Source)
           at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$null$0(LeaderElector.java:92)
           at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture.cancel(Unknown Source)
           at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.run(LeaderElector.java:70)
           at org.apache.nifi.kubernetes.leader.election.command.LeaderElectionCommand.run(LeaderElectionCommand.java:78)
           at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
           at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
           at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
           at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
           at java.base/java.lang.Thread.run(Unknown Source)
   Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://10.31.128.1/apis/coordination.k8s.io/v1/namespaces/nifi/leases/primary-node. Message: Operation cannot be fulfilled on leases.coordination.k8s.io "primary-node": the object has been modified; please apply your changes to the latest version and try again. Received status: Status(apiVersion=v1, code=409, details=StatusDetails(causes=[], group=coordination.k8s.io, kind=leases, name=primary-node, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Operation cannot be fulfilled on leases.coordination.k8s.io "primary-node": the object has been modified; please apply your changes to the latest version and try again, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Conflict, status=Failure, additionalProperties={}).
           at io.fabric8.kubernetes.client.KubernetesClientException.copyAsCause(KubernetesClientException.java:238)
           at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:517)
           at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:551)
           at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleUpdate(OperationSupport.java:347)
           at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleUpdate(BaseOperation.java:680)
           at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.lambda$replace$0(HasMetadataOperation.java:167)
           at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.replace(HasMetadataOperation.java:172)
           at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.replace(HasMetadataOperation.java:113)
           at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.replace(HasMetadataOperation.java:41)
           at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.replace(BaseOperation.java:1043)
           at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.replace(BaseOperation.java:88)
           at io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock.update(LeaseLock.java:100)
           ... 19 common frames omitted
   Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://10.31.128.1/apis/coordination.k8s.io/v1/namespaces/nifi/leases/primary-node. Message: Operation cannot be fulfilled on leases.coordination.k8s.io "primary-node": the object has been modified; please apply your changes to the latest version and try again. Received status: Status(apiVersion=v1, code=409, details=StatusDetails(causes=[], group=coordination.k8s.io, kind=leases, name=primary-node, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Operation cannot be fulfilled on leases.coordination.k8s.io "primary-node": the object has been modified; please apply your changes to the latest version and try again, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Conflict, status=Failure, additionalProperties={}).
           at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:709)
           at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:689)
           at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.assertResponseCode(OperationSupport.java:640)
           at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$handleResponse$0(OperationSupport.java:576)
           at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
           at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$retryWithExponentialBackoff$2(OperationSupport.java:618)
           at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
           at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
           at io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl$4.onResponse(OkHttpClientImpl.java:277)
           at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
           ... 3 common frames omitted
   ```
   
   So looks like it is not properly relinquishing the ownership of the lease. I presume this is what causes both nodes to believe that they are the coordinator/primary.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markap14 commented on a diff in pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
markap14 commented on code in PR #6779:
URL: https://github.com/apache/nifi/pull/6779#discussion_r1063739074


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java:
##########
@@ -6261,8 +6261,8 @@ private Set<String> getRoles(final NodeIdentifier nodeId) {
         final String nodeAddress = nodeId.getSocketAddress() + ":" + nodeId.getSocketPort();
 
         for (final String roleName : ClusterRoles.getAllRoles()) {
-            final String leader = leaderElectionManager.getLeader(roleName);
-            if (leader == null) {
+            final Optional<String> leader = leaderElectionManager.getLeader(roleName);

Review Comment:
   The change to `Optional<String>` here means that below, in line 6269, we are calling `leader.equals(nodeAddress)` which compares a `String` to an `Optional<String>` - need to ensure that we call `get()` first. As-is, the cluster page shows the nodes are connected but doesn't show which is Cluster Coordinator and which is primary node (though the nodes do appear to function in those roles properly).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markap14 commented on pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
markap14 commented on PR #6779:
URL: https://github.com/apache/nifi/pull/6779#issuecomment-1372856078

   Thanks for the latest updated @exceptionfactory . Ran into another issue when testing, unfortunately.
   I have a statefulset that had 3 replicas. `nifi-1` was both the primary node and the coordinator.
   I then scaled the statefulset to 0.
   This didn't expire the lease though.:
   ```
   mpayne@cs-654103601966-default:~$ k get leases
   NAME                  HOLDER             AGE
   cluster-coordinator   nifi-1.nifi:4423   63m
   primary-node          nifi-1.nifi:4423   62m
   ```
   
   Even after I waited over an hour the lease remains there. If I look at it:
   ```
   mpayne@cs-654103601966-default:~$ k get lease cluster-coordinator -o yaml
   apiVersion: coordination.k8s.io/v1
   kind: Lease
   metadata:
     creationTimestamp: "2023-01-05T21:06:29Z"
     name: cluster-coordinator
     namespace: nifi
     resourceVersion: "252479"
     uid: 7e5d05d1-3b20-426d-8822-5cff92eb183f
   spec:
     acquireTime: "2023-01-05T22:03:17.355642Z"
     holderIdentity: nifi-1.nifi:4423
     leaseDurationSeconds: 15
     leaseTransitions: 2
     renewTime: "2023-01-05T22:04:13.480562Z"
   
   mpayne@cs-654103601966-default:~$ date
   Thu 05 Jan 2023 10:11:34 PM UTC
   ```
   
   We can see here that date is well past the renewTime. (10:11:34 PM = 22:11:34 PM vs 22:04:13 as the renew time).
   So the least appears to remain, and the new node, `nifi-0` cannot proceed:
   
   ```
   2023-01-05 22:09:37,513 INFO [main] o.a.n.c.p.AbstractNodeProtocolSender Cluster Coordinator is located at nifi-1.nifi:4423. Will send Cluster Connection Request to this address
   2023-01-05 22:09:37,535 WARN [main] o.a.nifi.controller.StandardFlowService Failed to connect to cluster due to: org.apache.nifi.cluster.protocol.ProtocolException: Failed to create socket to nifi-1.nifi:4423 due to: java.net.UnknownHostException: nifi-1.nifi
   2023-01-05 22:09:42,550 INFO [main] o.a.n.c.c.n.LeaderElectionNodeProtocolSender Determined that Cluster Coordinator is located at nifi-1.nifi:4423; will use this address for sending heartbeat messages
   2023-01-05 22:09:42,550 INFO [main] o.a.n.c.p.AbstractNodeProtocolSender Cluster Coordinator is located at nifi-1.nifi:4423. Will send Cluster Connection Request to this address
   2023-01-05 22:09:42,550 WARN [main] o.a.nifi.controller.StandardFlowService Failed to connect to cluster due to: org.apache.nifi.cluster.protocol.ProtocolException: Failed to create socket to nifi-1.nifi:4423 due to: java.net.UnknownHostException: nifi-1.nifi
   ```
   
   As soon as I delete the lease (`k delete lease cluster-coordinator`) all works as expected.
   But we obviously can't have users manually deleting the lease all the time.
   Not sure if this is the intended behavior, and we should be ignoring the lease if the renewTime has expired? Or is it because we don't actually participate in the leader election on startup since there appears to already be an elected leader?
   Either way, we need to make sure that we can properly handle this condition, where the lease points to a node that is no longer part of the cluster


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markap14 commented on a diff in pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
markap14 commented on code in PR #6779:
URL: https://github.com/apache/nifi/pull/6779#discussion_r1048563903


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kubernetes.state.provider;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.StatusDetails;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.dsl.Resource;
+import org.apache.nifi.components.AbstractConfigurableComponent;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.components.state.StateProviderInitializationContext;
+import org.apache.nifi.kubernetes.client.ServiceAccountNamespaceProvider;
+import org.apache.nifi.kubernetes.client.StandardKubernetesClientProvider;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * State Provider implementation based on Kubernetes ConfigMaps with Base64 encoded keys to meet Kubernetes constraints
+ */
+public class KubernetesConfigMapStateProvider extends AbstractConfigurableComponent implements StateProvider {
+    private static final Scope[] SUPPORTED_SCOPES = { Scope.CLUSTER };
+
+    private static final long UNKNOWN_VERSION = 0;
+
+    private static final Charset KEY_CHARACTER_SET = StandardCharsets.UTF_8;
+
+    private static final String CONFIG_MAP_NAME_FORMAT = "nifi-component-%s";
+
+    /** Encode ConfigMap keys using URL Encoder without padding characters for compliance with Kubernetes naming */
+    private static final Base64.Encoder encoder = Base64.getUrlEncoder().withoutPadding();
+
+    private static final Base64.Decoder decoder = Base64.getUrlDecoder();
+
+    private final AtomicBoolean enabled = new AtomicBoolean();
+
+    private KubernetesClient kubernetesClient;
+
+    private String namespace;
+
+    private String identifier;
+
+    private ComponentLog logger;
+
+    /**
+     * Get configured component identifier
+     *
+     * @return Component Identifier
+     */
+    @Override
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    /**
+     * Initialize Provider using configured properties
+     *
+     * @param context Initialization Context
+     */
+    @Override
+    public void initialize(final StateProviderInitializationContext context) {
+        this.identifier = context.getIdentifier();
+        this.logger = context.getLogger();
+        this.kubernetesClient = getKubernetesClient();
+        this.namespace = new ServiceAccountNamespaceProvider().getNamespace();
+    }
+
+    /**
+     * Shutdown Provider
+     */
+    @Override
+    public void shutdown() {
+        kubernetesClient.close();
+        logger.info("Provider shutdown");
+    }
+
+    /**
+     * Set State as ConfigMap based on Component Identifier
+     *
+     * @param state State Map
+     * @param componentId Component Identifier
+     * @throws IOException Thrown on failure to set State Map
+     */
+    @Override
+    public void setState(final Map<String, String> state, final String componentId) throws IOException {
+        try {
+            final ConfigMap configMap = createConfigMapBuilder(state, componentId).build();
+            final ConfigMap configMapCreated = kubernetesClient.configMaps().resource(configMap).createOrReplace();
+            final long version = getVersion(configMapCreated);
+            logger.debug("Set State Component ID [{}] Version [{}]", componentId, version);
+        } catch (final KubernetesClientException e) {
+            if (isNotFound(e.getCode())) {
+                logger.debug("State not found for Component ID [{}]", componentId, e);
+            } else {
+                throw new IOException(String.format("Set failed for Component ID [%s]", componentId), e);
+            }
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Set failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Get State Map for Component Identifier
+     *
+     * @param componentId Component Identifier of State to be retrieved
+     * @return State Map
+     * @throws IOException Thrown on failure to get State Map
+     */
+    @Override
+    public StateMap getState(final String componentId) throws IOException {
+        try {
+            final ConfigMap configMap = configMapResource(componentId).get();
+            final Map<String, String> data = configMap == null ? Collections.emptyMap() : getDecodedMap(configMap.getData());
+            final long version = configMap == null ? UNKNOWN_VERSION : getVersion(configMap);
+            return new StandardStateMap(data, version);
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Get failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Replace State ConfigMap with new State based on current resource version
+     *
+     * @param currentState Current State Map with version
+     * @param state New State Map
+     * @param componentId Component Identifier
+     * @return Replace operation status
+     */
+    @Override
+    public boolean replace(final StateMap currentState, final Map<String, String> state, final String componentId) throws IOException {
+        final String resourceVersion = Long.toString(currentState.getVersion());
+        final ConfigMap configMap = createConfigMapBuilder(state, componentId)
+                .editOrNewMetadata()
+                .withResourceVersion(resourceVersion)
+                .endMetadata()
+                .build();
+
+        try {
+            final ConfigMap configMapReplaced = kubernetesClient.configMaps().resource(configMap).replace();
+            final long version = getVersion(configMapReplaced);
+            logger.debug("Replaced State Component ID [{}] Version [{}]", componentId, version);
+            return true;
+        } catch (final KubernetesClientException e) {
+            if (isNotFoundOrConflict(e.getCode())) {
+                logger.debug("Replace State Failed Component ID [{}] Version [{}]", componentId, resourceVersion, e);
+                return false;
+            } else {
+                throw new IOException(String.format("Replace failed for Component ID [%s]", componentId), e);
+            }
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Replace failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Clear state information for specified Component Identifier
+     *
+     * @param componentId the id of the component for which state is being cleared
+     * @throws IOException Thrown on failure to clear state for Component Identifier
+     */
+    @Override
+    public void clear(final String componentId) throws IOException {
+        try {
+            setState(Collections.emptyMap(), componentId);
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Clear failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Remove state information for specified Component Identifier
+     *
+     * @param componentId Identifier of component removed from the configuration
+     * @throws IOException Thrown on failure to remove state for Component Identifier
+     */
+    @Override
+    public void onComponentRemoved(final String componentId) throws IOException {
+        try {
+            final List<StatusDetails> deleteStatus = configMapResource(componentId).delete();
+            logger.debug("Config Map [{}] deleted {}", componentId, deleteStatus);
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Remove failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Enable Provider
+     */
+    @Override
+    public void enable() {
+        enabled.getAndSet(true);
+    }
+
+    /**
+     * Disable Provider
+     */
+    @Override
+    public void disable() {
+        enabled.getAndSet(false);
+    }
+
+    /**
+     * Get Enabled status
+     *
+     * @return Enabled status
+     */
+    @Override
+    public boolean isEnabled() {
+        return enabled.get();
+    }
+
+    /**
+     * Get Supported Scopes returns CLUSTER
+     *
+     * @return Supported Scopes including CLUSTER
+     */
+    @Override
+    public Scope[] getSupportedScopes() {
+        return SUPPORTED_SCOPES;
+    }
+
+    /**
+     * Get Kubernetes Client using standard configuration
+     *
+     * @return Kubernetes Client
+     */
+    protected KubernetesClient getKubernetesClient() {
+        return new StandardKubernetesClientProvider().getKubernetesClient();
+    }
+
+    private Resource<ConfigMap> configMapResource(final String componentId) {
+        final String name = getConfigMapName(componentId);
+        return kubernetesClient.configMaps().inNamespace(namespace).withName(name);
+    }
+
+    private ConfigMapBuilder createConfigMapBuilder(final Map<String, String> state, final String componentId) {
+        final Map<String, String> encodedData = getEncodedMap(state);
+        final String name = getConfigMapName(componentId);
+        return new ConfigMapBuilder()
+                .withNewMetadata()
+                .withNamespace(namespace)
+                .withName(name)
+                .endMetadata()
+                .withData(encodedData);
+    }
+
+    private String getConfigMapName(final String componentId) {
+        return String.format(CONFIG_MAP_NAME_FORMAT, componentId);
+    }
+
+    private long getVersion(final ConfigMap configMap) {
+        final ObjectMeta metadata = configMap.getMetadata();
+        final String resourceVersion = metadata.getResourceVersion();
+        try {
+            return resourceVersion == null ? UNKNOWN_VERSION : Long.parseLong(resourceVersion);

Review Comment:
   I don't believe this is valid. According to https://kubernetes.io/docs/reference/using-api/api-concepts/
   ```
   You must not assume resource versions are numeric or collatable. API clients may only compare two resource versions for equality (this means that you must not compare resource versions for greater-than or less-than relationships).
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6779:
URL: https://github.com/apache/nifi/pull/6779#discussion_r1048573920


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kubernetes.state.provider;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.StatusDetails;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.dsl.Resource;
+import org.apache.nifi.components.AbstractConfigurableComponent;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.components.state.StateProviderInitializationContext;
+import org.apache.nifi.kubernetes.client.ServiceAccountNamespaceProvider;
+import org.apache.nifi.kubernetes.client.StandardKubernetesClientProvider;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * State Provider implementation based on Kubernetes ConfigMaps with Base64 encoded keys to meet Kubernetes constraints
+ */
+public class KubernetesConfigMapStateProvider extends AbstractConfigurableComponent implements StateProvider {
+    private static final Scope[] SUPPORTED_SCOPES = { Scope.CLUSTER };
+
+    private static final long UNKNOWN_VERSION = 0;
+
+    private static final Charset KEY_CHARACTER_SET = StandardCharsets.UTF_8;
+
+    private static final String CONFIG_MAP_NAME_FORMAT = "nifi-component-%s";
+
+    /** Encode ConfigMap keys using URL Encoder without padding characters for compliance with Kubernetes naming */
+    private static final Base64.Encoder encoder = Base64.getUrlEncoder().withoutPadding();
+
+    private static final Base64.Decoder decoder = Base64.getUrlDecoder();
+
+    private final AtomicBoolean enabled = new AtomicBoolean();
+
+    private KubernetesClient kubernetesClient;
+
+    private String namespace;
+
+    private String identifier;
+
+    private ComponentLog logger;
+
+    /**
+     * Get configured component identifier
+     *
+     * @return Component Identifier
+     */
+    @Override
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    /**
+     * Initialize Provider using configured properties
+     *
+     * @param context Initialization Context
+     */
+    @Override
+    public void initialize(final StateProviderInitializationContext context) {
+        this.identifier = context.getIdentifier();
+        this.logger = context.getLogger();
+        this.kubernetesClient = getKubernetesClient();
+        this.namespace = new ServiceAccountNamespaceProvider().getNamespace();
+    }
+
+    /**
+     * Shutdown Provider
+     */
+    @Override
+    public void shutdown() {
+        kubernetesClient.close();
+        logger.info("Provider shutdown");
+    }
+
+    /**
+     * Set State as ConfigMap based on Component Identifier
+     *
+     * @param state State Map
+     * @param componentId Component Identifier
+     * @throws IOException Thrown on failure to set State Map
+     */
+    @Override
+    public void setState(final Map<String, String> state, final String componentId) throws IOException {
+        try {
+            final ConfigMap configMap = createConfigMapBuilder(state, componentId).build();
+            final ConfigMap configMapCreated = kubernetesClient.configMaps().resource(configMap).createOrReplace();
+            final long version = getVersion(configMapCreated);
+            logger.debug("Set State Component ID [{}] Version [{}]", componentId, version);
+        } catch (final KubernetesClientException e) {
+            if (isNotFound(e.getCode())) {
+                logger.debug("State not found for Component ID [{}]", componentId, e);
+            } else {
+                throw new IOException(String.format("Set failed for Component ID [%s]", componentId), e);
+            }
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Set failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Get State Map for Component Identifier
+     *
+     * @param componentId Component Identifier of State to be retrieved
+     * @return State Map
+     * @throws IOException Thrown on failure to get State Map
+     */
+    @Override
+    public StateMap getState(final String componentId) throws IOException {
+        try {
+            final ConfigMap configMap = configMapResource(componentId).get();
+            final Map<String, String> data = configMap == null ? Collections.emptyMap() : getDecodedMap(configMap.getData());
+            final long version = configMap == null ? UNKNOWN_VERSION : getVersion(configMap);
+            return new StandardStateMap(data, version);
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Get failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Replace State ConfigMap with new State based on current resource version
+     *
+     * @param currentState Current State Map with version
+     * @param state New State Map
+     * @param componentId Component Identifier
+     * @return Replace operation status
+     */
+    @Override
+    public boolean replace(final StateMap currentState, final Map<String, String> state, final String componentId) throws IOException {
+        final String resourceVersion = Long.toString(currentState.getVersion());
+        final ConfigMap configMap = createConfigMapBuilder(state, componentId)
+                .editOrNewMetadata()
+                .withResourceVersion(resourceVersion)
+                .endMetadata()
+                .build();
+
+        try {
+            final ConfigMap configMapReplaced = kubernetesClient.configMaps().resource(configMap).replace();
+            final long version = getVersion(configMapReplaced);
+            logger.debug("Replaced State Component ID [{}] Version [{}]", componentId, version);
+            return true;
+        } catch (final KubernetesClientException e) {
+            if (isNotFoundOrConflict(e.getCode())) {
+                logger.debug("Replace State Failed Component ID [{}] Version [{}]", componentId, resourceVersion, e);
+                return false;
+            } else {
+                throw new IOException(String.format("Replace failed for Component ID [%s]", componentId), e);
+            }
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Replace failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Clear state information for specified Component Identifier
+     *
+     * @param componentId the id of the component for which state is being cleared
+     * @throws IOException Thrown on failure to clear state for Component Identifier
+     */
+    @Override
+    public void clear(final String componentId) throws IOException {
+        try {
+            setState(Collections.emptyMap(), componentId);
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Clear failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Remove state information for specified Component Identifier
+     *
+     * @param componentId Identifier of component removed from the configuration
+     * @throws IOException Thrown on failure to remove state for Component Identifier
+     */
+    @Override
+    public void onComponentRemoved(final String componentId) throws IOException {
+        try {
+            final List<StatusDetails> deleteStatus = configMapResource(componentId).delete();
+            logger.debug("Config Map [{}] deleted {}", componentId, deleteStatus);
+        } catch (final RuntimeException e) {
+            throw new IOException(String.format("Remove failed for Component ID [%s]", componentId), e);
+        }
+    }
+
+    /**
+     * Enable Provider
+     */
+    @Override
+    public void enable() {
+        enabled.getAndSet(true);
+    }
+
+    /**
+     * Disable Provider
+     */
+    @Override
+    public void disable() {
+        enabled.getAndSet(false);
+    }
+
+    /**
+     * Get Enabled status
+     *
+     * @return Enabled status
+     */
+    @Override
+    public boolean isEnabled() {
+        return enabled.get();
+    }
+
+    /**
+     * Get Supported Scopes returns CLUSTER
+     *
+     * @return Supported Scopes including CLUSTER
+     */
+    @Override
+    public Scope[] getSupportedScopes() {
+        return SUPPORTED_SCOPES;
+    }
+
+    /**
+     * Get Kubernetes Client using standard configuration
+     *
+     * @return Kubernetes Client
+     */
+    protected KubernetesClient getKubernetesClient() {
+        return new StandardKubernetesClientProvider().getKubernetesClient();
+    }
+
+    private Resource<ConfigMap> configMapResource(final String componentId) {
+        final String name = getConfigMapName(componentId);
+        return kubernetesClient.configMaps().inNamespace(namespace).withName(name);
+    }
+
+    private ConfigMapBuilder createConfigMapBuilder(final Map<String, String> state, final String componentId) {
+        final Map<String, String> encodedData = getEncodedMap(state);
+        final String name = getConfigMapName(componentId);
+        return new ConfigMapBuilder()
+                .withNewMetadata()
+                .withNamespace(namespace)
+                .withName(name)
+                .endMetadata()
+                .withData(encodedData);
+    }
+
+    private String getConfigMapName(final String componentId) {
+        return String.format(CONFIG_MAP_NAME_FORMAT, componentId);
+    }
+
+    private long getVersion(final ConfigMap configMap) {
+        final ObjectMeta metadata = configMap.getMetadata();
+        final String resourceVersion = metadata.getResourceVersion();
+        try {
+            return resourceVersion == null ? UNKNOWN_VERSION : Long.parseLong(resourceVersion);

Review Comment:
   Thanks for the suggestion, adding a new `Optional<String> getStateVersion()` method to `StateMap` sounds like a good approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on PR #6779:
URL: https://github.com/apache/nifi/pull/6779#issuecomment-1353796999

   Thanks for the recommendation on the Docker script @markap14. I pushed updates with the following changes:
   
   1. Added `StateMap.getStateVersion()` returning `Optional<String>` and refactored all references
   2. Upgraded Fabric8 Kubernetes Client from 6.2.0 to 6.3.0 to correct stop leading issue found during testing
   3. Added `prop_replace` lines to Docker `start.sh` for setting the Leader Election and Cluster State Provider from environment variables


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bbende merged pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

Posted by "bbende (via GitHub)" <gi...@apache.org>.
bbende merged PR #6779:
URL: https://github.com/apache/nifi/pull/6779


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org