You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/12/14 14:34:50 UTC

[GitHub] [nifi] markap14 commented on a diff in pull request #6779: NIFI-10975 Add Kubernetes Leader Election and State Provider

markap14 commented on code in PR #6779:
URL: https://github.com/apache/nifi/pull/6779#discussion_r1048547015


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-leader-election/src/main/java/org/apache/nifi/kubernetes/leader/election/KubernetesLeaderElectionManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kubernetes.leader.election;
+
+import org.apache.nifi.controller.leader.election.LeaderElectionRole;
+import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
+import org.apache.nifi.controller.leader.election.TrackedLeaderElectionManager;
+import org.apache.nifi.kubernetes.client.KubernetesClientProvider;
+import org.apache.nifi.kubernetes.client.NamespaceProvider;
+import org.apache.nifi.kubernetes.client.ServiceAccountNamespaceProvider;
+import org.apache.nifi.kubernetes.client.StandardKubernetesClientProvider;
+import org.apache.nifi.kubernetes.leader.election.command.LeaderElectionCommandProvider;
+import org.apache.nifi.kubernetes.leader.election.command.StandardLeaderElectionCommandProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Kubernetes Leader Election Manager implementation using Kubernetes Lease Resources
+ */
+public class KubernetesLeaderElectionManager extends TrackedLeaderElectionManager {
+    private static final boolean INTERRUPT_ENABLED = true;
+
+    private static final int SERVICE_THREADS = 4;
+
+    private static final Logger logger = LoggerFactory.getLogger(KubernetesLeaderElectionManager.class);
+
+    private static final Map<String, String> ROLE_NAMES;
+
+    static {
+        final Map<String, String> roleNames = new LinkedHashMap<>();
+        for (final LeaderElectionRole leaderElectionRole : LeaderElectionRole.values()) {
+            roleNames.put(leaderElectionRole.getRoleName(), leaderElectionRole.getRoleId());
+        }
+        ROLE_NAMES = Collections.unmodifiableMap(roleNames);
+    }
+
+    private final ExecutorService executorService;
+
+    private final AtomicBoolean started = new AtomicBoolean();
+
+    private final Map<String, Future<?>> roleCommands = new ConcurrentHashMap<>();
+
+    private final Map<String, ParticipantRegistration> roleRegistrations = new ConcurrentHashMap<>();
+
+    private final Map<String, String> roleLeaders = new ConcurrentHashMap<>();
+
+    private final LeaderElectionCommandProvider leaderElectionCommandProvider;
+
+    /**
+     * Kubernetes Leader Election Manager default constructor
+     */
+    public KubernetesLeaderElectionManager() {
+        executorService = createExecutorService();
+        leaderElectionCommandProvider = createLeaderElectionCommandProvider();
+    }
+
+    /**
+     * Start Manager and register current roles
+     */
+    @Override
+    public void start() {
+        if (started.get()) {
+            logger.debug("Start requested when running");
+        } else {
+            started.getAndSet(true);
+            logger.debug("Started");
+
+            for (final ParticipantRegistration roleRegistration : roleRegistrations.values()) {
+                register(roleRegistration.roleName, roleRegistration.listener, roleRegistration.participantId);
+            }
+        }
+    }
+
+    /**
+     * Stop Manager and shutdown running commands
+     */
+    @Override
+    public void stop() {
+        try {
+            leaderElectionCommandProvider.close();
+        } catch (final IOException e) {
+            logger.warn("Leader Election Command Factory close failed", e);
+        }
+        executorService.shutdown();
+        started.getAndSet(false);
+        logger.debug("Stopped");
+    }
+
+    /**
+     * Register for Election or Observation based on presence of Participant ID and register for Leader when started
+     *
+     * @param roleName Role Name for registration
+     * @param listener State Change Listener for Leader Events
+     * @param participantId Participant ID or null when registering for Observation
+     */
+    @Override
+    public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
+        requireRoleName(roleName);
+        Objects.requireNonNull(listener, "Change Listener required");
+
+        final ParticipantRegistration roleRegistration = new ParticipantRegistration(roleName, participantId, listener);
+        roleRegistrations.put(roleName, roleRegistration);
+
+        final boolean participating = isParticipating(participantId);
+        if (participating) {
+            logger.debug("Registered Participation for Election Role [{}] ID [{}]", roleName, participantId);
+            if (started.get()) {
+                registerLeaderElectionCommand(roleName, listener, participantId);
+            }
+        } else {
+            logger.info("Registered Observation for Election Role [{}]", roleName);
+        }
+    }
+
+    /**
+     * Unregister for Leader Election of specified Role and cancel running command
+     *
+     * @param roleName Role Name to be removed from registration
+     */
+    @Override
+    public synchronized void unregister(final String roleName) {
+        requireRoleName(roleName);
+
+        final ParticipantRegistration roleRegistration = roleRegistrations.get(roleName);
+        if (roleRegistration == null) {
+            logger.info("Not registered for Election Role [{}]", roleName);
+        } else {
+            final Future<?> roleCommand = roleCommands.get(roleName);
+            if (roleCommand == null) {
+                logger.warn("Leader Election Command not found Role [{}] ID [{}]", roleName, roleRegistration.participantId);
+            } else {
+                roleCommand.cancel(INTERRUPT_ENABLED);
+            }
+
+            roleRegistrations.remove(roleName);
+            logger.info("Unregistered for Election Role [{}] ID [{}]", roleName, roleRegistration.participantId);
+        }
+    }
+
+    /**
+     * Determine whether current node is participating in Leader Election for specified Role
+     *
+     * @param roleName Role Name to be evaluated
+     * @return Participation status in Leader Election
+     */
+    @Override
+    public boolean isActiveParticipant(final String roleName) {
+        requireRoleName(roleName);
+        final String participantId = getParticipantId(roleName);
+        return isParticipating(participantId);
+    }
+
+    /**
+     * Get Leader Identifier for Role
+     *
+     * @param roleName Role Name for requested Leader Identifier
+     * @return Leader Identifier or null when not found
+     */
+    @Override
+    public String getLeader(final String roleName) {
+        requireRoleName(roleName);
+
+        final long pollStarted = System.nanoTime();
+        try {
+            final String leader;
+            if (started.get()) {
+                final String roleLeader = roleLeaders.get(roleName);
+                if (roleLeader == null) {
+                    logger.debug("Leader not registered: finding Leader for Role [{}]", roleName);
+                    leader = leaderElectionCommandProvider.findLeader(roleName);
+                } else {
+                    leader = roleLeader;
+                }
+            } else {
+                logger.debug("Manager not running: finding Leader for Role [{}]", roleName);
+                leader = leaderElectionCommandProvider.findLeader(roleName);
+            }
+            return leader;
+        } finally {
+            final long elapsed = System.nanoTime() - pollStarted;
+            registerPollTime(elapsed);
+        }
+    }
+
+    /**
+     * Determine whether current node is the Leader for the specified Role
+     *
+     * @param roleName Role Name to be evaluated
+     * @return Leader Status
+     */
+    @Override
+    public boolean isLeader(final String roleName) {
+        requireRoleName(roleName);
+        final boolean leader;
+
+        final String participantId = getParticipantId(roleName);
+        if (participantId == null) {
+            logger.debug("Role [{}] not participating in Leader election", roleName);
+            leader = false;
+        } else {
+            final String leaderId = getLeader(roleName);
+            leader = participantId.equals(leaderId);
+            if (leader) {
+                logger.debug("Role [{}] Participant ID [{}] is Leader", roleName, participantId);
+            } else {
+                logger.debug("Role [{}] Participant ID [{}] not Leader", roleName, leaderId);
+            }
+        }
+        return leader;
+    }
+
+    protected ExecutorService createExecutorService() {
+        return Executors.newFixedThreadPool(SERVICE_THREADS, new NamedThreadFactory());
+    }
+
+    protected LeaderElectionCommandProvider createLeaderElectionCommandProvider() {
+        final NamespaceProvider namespaceProvider = new ServiceAccountNamespaceProvider();
+        final String namespace = namespaceProvider.getNamespace();
+        final KubernetesClientProvider kubernetesClientProvider = new StandardKubernetesClientProvider();
+        return new StandardLeaderElectionCommandProvider(kubernetesClientProvider, namespace);
+    }
+
+    private void registerLeaderElectionCommand(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {

Review Comment:
   There's a lot of check-then-modify here. I think we need to ensure that this is thread safe and synchronize the method or add locking



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org