You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "exceptionfactory (via GitHub)" <gi...@apache.org> on 2023/03/07 01:35:12 UTC

[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

exceptionfactory commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1127240629


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackPostMessageResponse.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.slf4j.Logger;
+
+import java.util.Date;
+
+public class SlackPostMessageResponse {
+    private Boolean ok;
+    private String channel;
+    @JsonDeserialize(using = TimestampDeserializer.class)
+    private Date ts;
+    private Message message;
+    private String error;
+    private String warning;
+
+    public Boolean isOk() {
+        return ok;
+    }
+
+    public void setOk(Boolean ok) {
+        this.ok = ok;
+    }
+
+    public String getChannel() {
+        return channel;
+    }
+
+    public void setChannel(String channel) {
+        this.channel = channel;
+    }
+
+    public Date getTs() {
+        return ts;
+    }
+
+    public void setTs(Date ts) {
+        this.ts = ts;
+    }
+
+    public Message getMessage() {
+        return message;
+    }
+
+    public void setMessage(Message message) {
+        this.message = message;
+    }
+
+    public String getError() {
+        return error;
+    }
+
+    public void setError(String error) {
+        this.error = error;
+    }
+
+    public String getWarning() {
+        return warning;
+    }
+
+    public void setWarning(String warning) {
+        this.warning = warning;
+    }
+
+    public void checkResponse(final Logger logger) throws SlackRestServiceException {
+        if (isOk() == null) {
+            throw new SlackRestServiceException("Slack response JSON does not contain 'ok' key or it has invalid value.");
+        }
+        if (!isOk()) {
+            throw new SlackRestServiceException("Slack error response: " + getError());
+        }
+
+        if (getWarning() != null) {
+            logger.warn("Slack warning message: " + getWarning());
+        }
+    }

Review Comment:
   Recommend moving the `checkResponse` method out of the model class and into the `SlackRestService` class.



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/Message.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import java.util.Date;
+
+public class Message {
+    private String text;
+    private String username;
+    @JsonProperty("first_name")

Review Comment:
   The `first_name` property appears out of place for the `botId` property, is it needed?



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/Message.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import java.util.Date;
+
+public class Message {
+    private String text;
+    private String username;
+    @JsonProperty("first_name")
+    private String botId;
+    @JsonProperty("attachments")
+    private Attachment[] attachment;
+    private String type;
+    private String subtype;
+    @JsonDeserialize(using = TimestampDeserializer.class)

Review Comment:
   Is this annotation necessary?



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");

Review Comment:
   Recommend defining `chat.postMessage` as a static final variable named `POST_MESSAGE_PATH`.



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        if (StringUtils.isEmpty(channel)) {
+            throw new SlackRestServiceException("The channel must be specified.");
+        }
+        requestBodyJson.put("channel", channel);
+
+        if (StringUtils.isEmpty(message)) {
+            throw new SlackRestServiceException("No message to be sent with this record.");
+        }
+        requestBodyJson.put("text", message);
+
+
+        final InputStream requestBodyInputStream;
+        try {
+            requestBodyInputStream = IOUtils.toInputStream(

Review Comment:
   Recommend replacing `IOUtils.toInputStream()` with a simple method that returns `ByteArrayInputStream`. That would allow removing the dependency on `commons-io`.



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        if (StringUtils.isEmpty(channel)) {
+            throw new SlackRestServiceException("The channel must be specified.");
+        }
+        requestBodyJson.put("channel", channel);
+
+        if (StringUtils.isEmpty(message)) {
+            throw new SlackRestServiceException("No message to be sent with this record.");
+        }

Review Comment:
   Recommend removing this check and instead adding and check in the calling method.



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/TimestampDeserializer.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+
+import java.io.IOException;
+import java.util.Date;
+
+public class TimestampDeserializer extends JsonDeserializer<Date> {
+    @Override
+    public Date deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException {
+        final String timestampString = jp.getText().trim();
+        final Double milliseconds = Double.valueOf(timestampString) * 1000;
+
+        final Date timestamp = new Date();
+        timestamp.setTime(milliseconds.longValue());
+
+        System.out.println(timestamp.getTime());

Review Comment:
   The `println` needs to be removed.



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/Message.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import java.util.Date;
+
+public class Message {
+    private String text;
+    private String username;
+    @JsonProperty("first_name")
+    private String botId;
+    @JsonProperty("attachments")
+    private Attachment[] attachment;

Review Comment:
   Is there a reason for using this annotation as opposed to naming the field `attachments`?



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"slack", "record", "sink"})
+@CapabilityDescription("Format and send Records using Slack. This controller service uses Slack Web API methods to post " +
+        "messages to a specific channel. Before using SlackRecordSink, you need to create a Slack App, to add a Bot User " +
+        "to the app, and to install the app in your Slack workspace. After the app installed, you can get " +
+        "the Bot User's OAuth Bearer Token that will be needed to authenticate and authorize " +
+        "your SlackRecordSink controller service to Slack.")
+public class SlackRecordSink extends AbstractControllerService implements RecordSinkService {
+
+    private static final String SLACK_API_URL = "https://slack.com/api";
+
+    public static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("Slack Web API URL for posting text messages to channels." +
+                    " It only needs to be changed if Slack changes its API URL.")
+            .required(true)
+            .defaultValue(SLACK_API_URL)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder()
+            .name("access-token")
+            .displayName("Access Token")
+            .description("Bot OAuth Token used for authenticating/authorizing the Slack request sent by NiFi.")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder()
+            .name("channel-id")
+            .displayName("Channel ID")
+            .description("Slack channel, private group, or IM channel to send the message to. Use Channel ID instead of the name.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor WEB_SERVICE_CLIENT_PROVIDER = new PropertyDescriptor.Builder()
+            .name("web-service-client-provider")
+            .displayName("Web Service Client Provider")
+            .description("Controller service to provide HTTP client for communicating with Slack API")
+            .required(true)
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .build();
+    private volatile RecordSetWriterFactory writerFactory;
+    private SlackRestService service;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Collections.unmodifiableList(Arrays.asList(
+                API_URL,
+                ACCESS_TOKEN,
+                CHANNEL_ID,
+                RECORD_WRITER_FACTORY,
+                WEB_SERVICE_CLIENT_PROVIDER
+        ));
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+        final WebClientServiceProvider webClientServiceProvider = context
+                .getProperty(WEB_SERVICE_CLIENT_PROVIDER)
+                .asControllerService(WebClientServiceProvider.class);
+        final String accessToken = context.getProperty(ACCESS_TOKEN).getValue();
+        final String apiUrl = context.getProperty(API_URL).getValue();
+        service = new SlackRestService(webClientServiceProvider, accessToken, apiUrl);
+    }
+
+    @Override
+    public WriteResult sendData(final RecordSet recordSet, final Map<String, String> attributes, final boolean sendZeroResults) throws IOException {
+        WriteResult writeResult;
+        try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), recordSet.getSchema(), out, attributes)) {
+                writer.beginRecordSet();
+                Record record = recordSet.next();
+                while (record != null) {
+                    writer.write(record);
+                    writer.flush();
+                    record = recordSet.next();
+                }
+                writeResult = writer.finishRecordSet();
+                writer.flush();
+            } catch (final SchemaNotFoundException e) {
+                final String errorMessage = String.format("RecordSetWriter could not be created because the schema was not found. The schema name for the RecordSet to write is %s",
+                        recordSet.getSchema().getSchemaName());
+                throw new ProcessException(errorMessage, e);
+            }
+
+            try {
+                final String message = out.toString();
+                final String channel = getConfigurationContext().getProperty(CHANNEL_ID).getValue();
+                service.sendMessageToChannel(message, channel);
+            } catch (final SlackRestServiceException e) {
+                getLogger().error("Failed to send message to Slack.", e);
+                throw new ProcessException(e);

Review Comment:
   The logger can be removed and the message can be included in the Exception. Also recommend using the `IOException`.
   ```suggestion
                   throw new IOException("Failed to send messages to Slack", e);
   ```



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"slack", "record", "sink"})
+@CapabilityDescription("Format and send Records using Slack. This controller service uses Slack Web API methods to post " +
+        "messages to a specific channel. Before using SlackRecordSink, you need to create a Slack App, to add a Bot User " +
+        "to the app, and to install the app in your Slack workspace. After the app installed, you can get " +
+        "the Bot User's OAuth Bearer Token that will be needed to authenticate and authorize " +
+        "your SlackRecordSink controller service to Slack.")
+public class SlackRecordSink extends AbstractControllerService implements RecordSinkService {
+
+    private static final String SLACK_API_URL = "https://slack.com/api";
+
+    public static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("Slack Web API URL for posting text messages to channels." +
+                    " It only needs to be changed if Slack changes its API URL.")
+            .required(true)
+            .defaultValue(SLACK_API_URL)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder()
+            .name("access-token")
+            .displayName("Access Token")
+            .description("Bot OAuth Token used for authenticating/authorizing the Slack request sent by NiFi.")

Review Comment:
   ```suggestion
               .description("Bot OAuth Token used for authenticating and authorizing the request to Slack")
   ```



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        if (StringUtils.isEmpty(channel)) {
+            throw new SlackRestServiceException("The channel must be specified.");
+        }
+        requestBodyJson.put("channel", channel);
+
+        if (StringUtils.isEmpty(message)) {
+            throw new SlackRestServiceException("No message to be sent with this record.");
+        }
+        requestBodyJson.put("text", message);

Review Comment:
   Recommend moving the construction of the `ObjectNode` to a separate method.



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        if (StringUtils.isEmpty(channel)) {
+            throw new SlackRestServiceException("The channel must be specified.");
+        }
+        requestBodyJson.put("channel", channel);
+
+        if (StringUtils.isEmpty(message)) {
+            throw new SlackRestServiceException("No message to be sent with this record.");
+        }
+        requestBodyJson.put("text", message);
+
+
+        final InputStream requestBodyInputStream;
+        try {
+            requestBodyInputStream = IOUtils.toInputStream(
+                    objectMapper.writeValueAsString(requestBodyJson),
+                    StandardCharsets.UTF_8
+            );
+        } catch (final JsonProcessingException e) {
+            throw new SlackRestServiceException("JSON processing exception occurred", e);
+        }
+
+        try (final HttpResponseEntity response = webClientServiceProvider.getWebClientService()
+                .post()
+                .uri(uri)
+                .header("Authorization", String.format("Bearer %s", accessToken))
+                .header("Content-Type", "application/json")
+                .body(requestBodyInputStream, OptionalLong.of(requestBodyInputStream.available()))
+                .retrieve()) {
+            final int statusCode = response.statusCode();
+            if (!(statusCode >= 200 && statusCode < 300)) {
+                throw new SlackRestServiceException("HTTP error code: " + statusCode);
+            }
+
+            try {
+                final SlackPostMessageResponse slackResponse = objectMapper.readValue(response.body(), SlackPostMessageResponse.class);
+                logger.error(objectMapper.writeValueAsString(slackResponse));

Review Comment:
   This looks like it should be removed since it logs everything as an error.



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        if (StringUtils.isEmpty(channel)) {
+            throw new SlackRestServiceException("The channel must be specified.");
+        }
+        requestBodyJson.put("channel", channel);
+
+        if (StringUtils.isEmpty(message)) {
+            throw new SlackRestServiceException("No message to be sent with this record.");
+        }
+        requestBodyJson.put("text", message);
+
+
+        final InputStream requestBodyInputStream;
+        try {
+            requestBodyInputStream = IOUtils.toInputStream(
+                    objectMapper.writeValueAsString(requestBodyJson),
+                    StandardCharsets.UTF_8
+            );
+        } catch (final JsonProcessingException e) {
+            throw new SlackRestServiceException("JSON processing exception occurred", e);
+        }
+
+        try (final HttpResponseEntity response = webClientServiceProvider.getWebClientService()
+                .post()
+                .uri(uri)
+                .header("Authorization", String.format("Bearer %s", accessToken))
+                .header("Content-Type", "application/json")
+                .body(requestBodyInputStream, OptionalLong.of(requestBodyInputStream.available()))
+                .retrieve()) {
+            final int statusCode = response.statusCode();
+            if (!(statusCode >= 200 && statusCode < 300)) {
+                throw new SlackRestServiceException("HTTP error code: " + statusCode);
+            }
+
+            try {
+                final SlackPostMessageResponse slackResponse = objectMapper.readValue(response.body(), SlackPostMessageResponse.class);
+                logger.error(objectMapper.writeValueAsString(slackResponse));
+
+                slackResponse.checkResponse(logger);
+            } catch (final IOException e) {
+                throw new SlackRestServiceException("Slack response JSON cannot be parsed.", e);

Review Comment:
   Recommend removing the period character.
   ```suggestion
                   throw new SlackRestServiceException("JSON response parsing failed", e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org