You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2019/03/14 03:50:03 UTC

[GitHub] [nifi] ijokarumawak commented on a change in pull request #3344: NIFI-6092 Create ListenSlack and FetchSlack processors

ijokarumawak commented on a change in pull request #3344: NIFI-6092 Create ListenSlack and FetchSlack processors
URL: https://github.com/apache/nifi/pull/3344#discussion_r265403103
 
 

 ##########
 File path: nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/ListenSlack.java
 ##########
 @@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.slack;
+
+import java.io.StringReader;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import javax.json.Json;
+import javax.json.JsonException;
+import javax.json.JsonReader;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.slack.controllers.SlackConnectionService;
+
+@Tags({"slack", "listen", "json", "events"})
+@CapabilityDescription("Listens on the provided slack connection controller service and " +
+  "forwards the messages in JSON text format.")
+@SeeAlso({SlackConnectionService.class, FetchSlack.class})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+public class ListenSlack extends AbstractSessionFactoryProcessor {
+
+  private volatile ProcessSessionFactory processSessionFactory;
+
+  private volatile SlackConnectionService slackConnectionService;
+
+  private volatile List<String> matchingTypes;
+
+  private volatile boolean matchAny;
+
+  static final PropertyDescriptor SLACK_CONNECTION_SERVICE = new PropertyDescriptor.Builder()
+    .name("slack-connection-service")
+    .displayName("Slack Connection ControllerService")
+    .description("A ControllerService that provides a connection to slack service.")
+    .required(true)
+    .identifiesControllerService(SlackConnectionService.class)
+    .build();
+
+  static final PropertyDescriptor MESSAGE_TYPES = new PropertyDescriptor
+    .Builder()
+    .name("message-types")
+    .displayName("Message types")
+    .description("Message types to listen to. It will filter messages with the given types or " +
+      "listen to every type if empty. It is a coma separated list like: message,file_shared")
+    .required(false)
+    .addValidator(Validator.VALID)
+    .build();
+
+  static final Relationship MATCHED_MESSAGES_RELATIONSHIP = new Relationship.Builder()
+    .name("matched")
+    .description("Incoming messages that matches the given types")
+    .build();
+
+  static final Relationship UNMATCHED_MESSAGES_RELATIONSHIP = new Relationship.Builder()
+    .name("unmatched")
+    .description("Incoming messages that does not match the given types")
+    .autoTerminateDefault(true)
+    .build();
+
+  private static final String JSON_OBJECT_TYPE_KEY = "type";
+
+  private static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+    SLACK_CONNECTION_SERVICE, MESSAGE_TYPES));
+
+  private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+    MATCHED_MESSAGES_RELATIONSHIP, UNMATCHED_MESSAGES_RELATIONSHIP)));
+
+  @Override
+  public Set<Relationship> getRelationships() {
+    return RELATIONSHIPS;
+  }
+
+  @Override
+  public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+    return DESCRIPTORS;
+  }
+
+  @OnScheduled
+  public void onScheduled(final ProcessContext context) {
+    String messageTypes = context.getProperty(MESSAGE_TYPES).getValue();
+    messageTypes = messageTypes == null ? "" : messageTypes;
+    matchAny = messageTypes.isEmpty();
+    matchingTypes = Arrays.asList(messageTypes.split(","));
+  }
+
+  @OnUnscheduled
+  @OnShutdown
+  public void onUnscheduled() {
+    if (isProcessorRegisteredToService()) {
+      deregister();
+    }
+  }
+
+  private void deregister() {
+    slackConnectionService.deregisterProcessor(this);
+  }
+
+  @Override
+  public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+    if (processSessionFactory == null) {
+      processSessionFactory = sessionFactory;
+    }
+    if (!isProcessorRegisteredToService()) {
+      registerProcessorToService(context);
+    }
+    context.yield();
+  }
+
+  private boolean isProcessorRegisteredToService() {
+    return slackConnectionService != null
+      && slackConnectionService.isProcessorRegistered(this);
+  }
+
+  private void registerProcessorToService(ProcessContext context) {
+      try {
+        slackConnectionService = context.getProperty(SLACK_CONNECTION_SERVICE)
+          .asControllerService(SlackConnectionService.class);
+        slackConnectionService.registerProcessor(this, getMessageHandler(processSessionFactory));
+      } catch (Exception e) {
+        getLogger().error("Error while creating slack client", e);
+        slackConnectionService.deregisterProcessor(this);
+        context.yield();
+      }
+  }
+
+  private Consumer<String> getMessageHandler(ProcessSessionFactory sessionFactory) {
+    return message -> {
+      ProcessSession session = sessionFactory.createSession();
+      try {
+        FlowFile flowFile = session.create();
+        session.write(flowFile, outputStream -> outputStream.write(message.getBytes()));
+        session.getProvenanceReporter().receive(flowFile, "slack");
+
+        if (matches(message)) {
+          session.transfer(flowFile, MATCHED_MESSAGES_RELATIONSHIP);
 
 Review comment:
   Usually, Listen/Consume processors only have 'success' relationship. I suggest following that convention. Only create a FlowFile when the message matches. Then transfer it to 'success' relationship.
   
   Even if we have matched/unmatched relationship, I imagine most use-cases auto terminate 'unmatched'.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services