You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "pvillard31 (via GitHub)" <gi...@apache.org> on 2023/09/14 10:59:19 UTC

[GitHub] [nifi] pvillard31 commented on a diff in pull request #7673: NIFI-11938: Created ConsumeSlack and ListenSlack Processors to consume message events and commands from Slack

pvillard31 commented on code in PR #7673:
URL: https://github.com/apache/nifi/pull/7673#discussion_r1325777266


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/consume/ConsumeChannel.java:
##########
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.slack.consume;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.slack.api.methods.SlackApiException;
+import com.slack.api.methods.request.conversations.ConversationsHistoryRequest;
+import com.slack.api.methods.request.conversations.ConversationsRepliesRequest;
+import com.slack.api.methods.response.conversations.ConversationsHistoryResponse;
+import com.slack.api.methods.response.conversations.ConversationsRepliesResponse;
+import com.slack.api.model.Message;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ConsumeChannel {
+    private static final String CONVERSATION_HISTORY_URL = "https://slack.com/api/conversations.history";
+
+    private static final String CHECK_FOR_REPLIES = "check for replies";
+    private static final String BACKWARD = "backward";
+    private static final String FORWARD = "forward";
+    private static final Pattern MENTION_PATTERN = Pattern.compile("<@(U.*?)>");
+    private static final long YIELD_MILLIS = 3_000L;
+
+
+    private final ConsumeSlackClient client;
+    private final String channelId;
+    private final int batchSize;
+    private final long replyMonitorFrequencyMillis;
+    private final long replyMonitorWindowMillis;
+    private final boolean resolveUsernames;
+    private final boolean includeMessageBlocks;
+    private final UsernameLookup usernameLookup;
+    private final Relationship successRelationship;
+    private final ComponentLog logger;
+    private final ObjectMapper objectMapper;
+    private final StateKeys stateKeys;
+
+    private volatile long yieldExpiration;
+    private volatile long lastReplyMonitorPollEnd = System.currentTimeMillis();
+    private final AtomicLong nextRequestTime = new AtomicLong(0L);
+
+
+    private ConsumeChannel(final Builder builder) {
+        this.client = builder.client;
+        this.channelId = builder.channelId;
+        this.batchSize = builder.batchSize;
+        this.replyMonitorFrequencyMillis = builder.replyMonitorFrequencyMillis;
+        this.replyMonitorWindowMillis = builder.replyMonitorWindowMillis;
+        this.logger = builder.logger;
+        this.resolveUsernames = builder.resolveUsernames;
+        this.includeMessageBlocks = builder.includeMessageBlocks;
+        this.successRelationship = builder.successRelationship;
+        this.usernameLookup = builder.usernameLookup;
+        this.objectMapper = builder.objectMapper;
+
+        stateKeys = new StateKeys(channelId);
+    }
+
+    public String getChannelId() {
+        return channelId;
+    }
+
+    public ConfigVerificationResult verify() {
+        final ConversationsHistoryRequest request = ConversationsHistoryRequest.builder()
+            .channel(channelId)
+            .limit(1)
+            .build();
+
+        final ConversationsHistoryResponse response;
+        try {
+            response = client.fetchConversationsHistory(request);
+        } catch (final Exception e) {
+            return new ConfigVerificationResult.Builder()
+                .verificationStepName("Check authorization for Channel " + channelId)
+                .outcome(ConfigVerificationResult.Outcome.FAILED)
+                .explanation("Failed to obtain a message due to: " + e)
+                .build();
+        }
+
+        if (response.isOk()) {
+            final List<Message> messages = response.getMessages();
+            final Message firstMessage = messages.get(0);
+            enrichMessage(firstMessage);
+
+            final String username = firstMessage.getUsername();
+            if (resolveUsernames && username == null) {
+                return new ConfigVerificationResult.Builder()
+                    .verificationStepName("Check authorization for Channel " + channelId)
+                    .outcome(ConfigVerificationResult.Outcome.FAILED)
+                    .explanation("Successfully retrieved a message but failed to resolve the username")
+                    .build();
+            }
+
+            final String user = username == null ? firstMessage.getUser() : username;
+            final String explanation = response.getMessages().isEmpty() ? "Successfully requested messages for channel but got no messages" : "Successfully retrieved a message from " + user;
+
+            return new ConfigVerificationResult.Builder()
+                .verificationStepName("Check authorization for Channel " + channelId)
+                .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+                .explanation(explanation)
+                .build();
+        }
+
+        final String errorMessage = ConsumeSlackUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
+        return new ConfigVerificationResult.Builder()
+            .verificationStepName("Check authorization for Channel " + channelId)
+            .outcome(ConfigVerificationResult.Outcome.FAILED)
+            .explanation("Failed to obtain a message due to: " + errorMessage)
+            .build();
+    }
+
+    public void consume(final ProcessContext context, final ProcessSession session) throws IOException, SlackApiException {
+        final long minTimestamp = nextRequestTime.get();
+        if (minTimestamp > 0 && System.currentTimeMillis() < minTimestamp) {
+            context.yield();
+            return;
+        }
+
+        // Get the current state
+        final StateMap stateMap;
+        try {
+            stateMap = session.getState(Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            logger.error("Failed to determine current offset for channel {}; will not retrieve any messages until this is resolved", channelId, ioe);
+            context.yield();
+            return;
+        }
+
+        // Determine if we need to check historical messages for replies, or if we need to consume the latest messages.
+        final boolean checkForReplies = isCheckForReplies(stateMap);
+
+        if (checkForReplies) {
+            consumeReplies(context, session, stateMap);
+        } else {
+            consumeLatestMessages(context, session, stateMap);
+        }
+    }
+
+    private boolean isCheckForReplies(final StateMap stateMap) {
+        final String currentAction = stateMap.get(stateKeys.ACTION);
+        if (CHECK_FOR_REPLIES.equals(currentAction)) {
+            return true;
+        }
+
+        final long nextCheckRepliesTime = lastReplyMonitorPollEnd + replyMonitorFrequencyMillis;
+        if (System.currentTimeMillis() > nextCheckRepliesTime) {
+            return true;
+        }
+
+        return false;
+    }
+
+    private void consumeReplies(final ProcessContext context, final ProcessSession session, final StateMap stateMap) throws IOException, SlackApiException {
+        // Make sure that we've completed our initial "load" of messages. If not, we want to load the messages before we start
+        // monitoring for updates to threads.
+        final String direction = stateMap.get(stateKeys.DIRECTION);
+        if (!FORWARD.equals(direction)) {
+            onCompletedRepliesScan(session, new HashMap<>(stateMap.toMap()), null);
+            return;
+        }
+
+        // We want to use the latest timestamp we've seen as the threshold for replies.
+        final String latestTs = stateMap.get(stateKeys.LATEST_TS);
+        if (latestTs == null) {
+            onCompletedRepliesScan(session, new HashMap<>(stateMap.toMap()), null);
+            return;
+        }
+
+        // If the action has not been set to denote that we're in teh process of checking for replies, do so now.
+        final Map<String, String> updatedStateMap = new HashMap<>(stateMap.toMap());
+        final String currentAction = stateMap.get(stateKeys.ACTION);
+        if (!CHECK_FOR_REPLIES.equals(currentAction)) {
+            updatedStateMap.put(stateKeys.ACTION, CHECK_FOR_REPLIES);
+            session.setState(updatedStateMap, Scope.CLUSTER);
+        }
+
+        String minTsValue = stateMap.get(stateKeys.REPLY_MIN_TS);
+        if (minTsValue == null) {
+            minTsValue = latestTs;
+        }
+        final String maxTsValue = stateMap.get(stateKeys.REPLY_MAX_TS);
+        final SlackTimestamp minTs = new SlackTimestamp(minTsValue);
+        final SlackTimestamp maxTs = maxTsValue == null ? new SlackTimestamp() : new SlackTimestamp(maxTsValue);
+        final SlackTimestamp maxParentTs = new SlackTimestamp(latestTs);
+
+        // TODO: Test this heavily

Review Comment:
   Should it be removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org