You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/10/27 20:48:29 UTC

[GitHub] [nifi] emiliosetiadarma opened a new pull request, #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

emiliosetiadarma opened a new pull request, #6593:
URL: https://github.com/apache/nifi/pull/6593

   …in CapabilityDescription of SlackRecordSink
   
   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   [NIFI-8497](https://issues.apache.org/jira/browse/NIFI-8497)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [x] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [x] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [x] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [x] Pull Request based on current revision of the `main` branch
   - [x] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [x] Build completed using `mvn clean install -P contrib-check`
     - [x] JDK 8
     - [ ] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] emiliosetiadarma commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "emiliosetiadarma (via GitHub)" <gi...@apache.org>.
emiliosetiadarma commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1128669764


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        if (StringUtils.isEmpty(channel)) {
+            throw new SlackRestServiceException("The channel must be specified.");
+        }
+        requestBodyJson.put("channel", channel);
+
+        if (StringUtils.isEmpty(message)) {
+            throw new SlackRestServiceException("No message to be sent with this record.");
+        }
+        requestBodyJson.put("text", message);
+
+
+        final InputStream requestBodyInputStream;
+        try {
+            requestBodyInputStream = IOUtils.toInputStream(

Review Comment:
   Changing it to ByteArrayInputStream



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] emiliosetiadarma commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "emiliosetiadarma (via GitHub)" <gi...@apache.org>.
emiliosetiadarma commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1128772209


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        if (StringUtils.isEmpty(channel)) {
+            throw new SlackRestServiceException("The channel must be specified.");
+        }
+        requestBodyJson.put("channel", channel);
+
+        if (StringUtils.isEmpty(message)) {
+            throw new SlackRestServiceException("No message to be sent with this record.");
+        }

Review Comment:
   Removing this check, and the same check for `Channel`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory closed pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory closed pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …
URL: https://github.com/apache/nifi/pull/6593


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] emiliosetiadarma commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "emiliosetiadarma (via GitHub)" <gi...@apache.org>.
emiliosetiadarma commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1128670840


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        if (StringUtils.isEmpty(channel)) {
+            throw new SlackRestServiceException("The channel must be specified.");
+        }
+        requestBodyJson.put("channel", channel);
+
+        if (StringUtils.isEmpty(message)) {
+            throw new SlackRestServiceException("No message to be sent with this record.");
+        }
+        requestBodyJson.put("text", message);
+
+
+        final InputStream requestBodyInputStream;
+        try {
+            requestBodyInputStream = IOUtils.toInputStream(
+                    objectMapper.writeValueAsString(requestBodyJson),
+                    StandardCharsets.UTF_8
+            );
+        } catch (final JsonProcessingException e) {
+            throw new SlackRestServiceException("JSON processing exception occurred", e);
+        }
+
+        try (final HttpResponseEntity response = webClientServiceProvider.getWebClientService()
+                .post()
+                .uri(uri)
+                .header("Authorization", String.format("Bearer %s", accessToken))
+                .header("Content-Type", "application/json")
+                .body(requestBodyInputStream, OptionalLong.of(requestBodyInputStream.available()))
+                .retrieve()) {
+            final int statusCode = response.statusCode();
+            if (!(statusCode >= 200 && statusCode < 300)) {
+                throw new SlackRestServiceException("HTTP error code: " + statusCode);
+            }
+
+            try {
+                final SlackPostMessageResponse slackResponse = objectMapper.readValue(response.body(), SlackPostMessageResponse.class);
+                logger.error(objectMapper.writeValueAsString(slackResponse));
+
+                slackResponse.checkResponse(logger);
+            } catch (final IOException e) {
+                throw new SlackRestServiceException("Slack response JSON cannot be parsed.", e);

Review Comment:
   Making the changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] emiliosetiadarma commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "emiliosetiadarma (via GitHub)" <gi...@apache.org>.
emiliosetiadarma commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1128770975


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/TimestampDeserializer.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+
+import java.io.IOException;
+import java.util.Date;
+
+public class TimestampDeserializer extends JsonDeserializer<Date> {

Review Comment:
   Made the changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] emiliosetiadarma commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "emiliosetiadarma (via GitHub)" <gi...@apache.org>.
emiliosetiadarma commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1128592490


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");

Review Comment:
   Changing it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1127240629


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackPostMessageResponse.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.slf4j.Logger;
+
+import java.util.Date;
+
+public class SlackPostMessageResponse {
+    private Boolean ok;
+    private String channel;
+    @JsonDeserialize(using = TimestampDeserializer.class)
+    private Date ts;
+    private Message message;
+    private String error;
+    private String warning;
+
+    public Boolean isOk() {
+        return ok;
+    }
+
+    public void setOk(Boolean ok) {
+        this.ok = ok;
+    }
+
+    public String getChannel() {
+        return channel;
+    }
+
+    public void setChannel(String channel) {
+        this.channel = channel;
+    }
+
+    public Date getTs() {
+        return ts;
+    }
+
+    public void setTs(Date ts) {
+        this.ts = ts;
+    }
+
+    public Message getMessage() {
+        return message;
+    }
+
+    public void setMessage(Message message) {
+        this.message = message;
+    }
+
+    public String getError() {
+        return error;
+    }
+
+    public void setError(String error) {
+        this.error = error;
+    }
+
+    public String getWarning() {
+        return warning;
+    }
+
+    public void setWarning(String warning) {
+        this.warning = warning;
+    }
+
+    public void checkResponse(final Logger logger) throws SlackRestServiceException {
+        if (isOk() == null) {
+            throw new SlackRestServiceException("Slack response JSON does not contain 'ok' key or it has invalid value.");
+        }
+        if (!isOk()) {
+            throw new SlackRestServiceException("Slack error response: " + getError());
+        }
+
+        if (getWarning() != null) {
+            logger.warn("Slack warning message: " + getWarning());
+        }
+    }

Review Comment:
   Recommend moving the `checkResponse` method out of the model class and into the `SlackRestService` class.



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/Message.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import java.util.Date;
+
+public class Message {
+    private String text;
+    private String username;
+    @JsonProperty("first_name")

Review Comment:
   The `first_name` property appears out of place for the `botId` property, is it needed?



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/Message.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import java.util.Date;
+
+public class Message {
+    private String text;
+    private String username;
+    @JsonProperty("first_name")
+    private String botId;
+    @JsonProperty("attachments")
+    private Attachment[] attachment;
+    private String type;
+    private String subtype;
+    @JsonDeserialize(using = TimestampDeserializer.class)

Review Comment:
   Is this annotation necessary?



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");

Review Comment:
   Recommend defining `chat.postMessage` as a static final variable named `POST_MESSAGE_PATH`.



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        if (StringUtils.isEmpty(channel)) {
+            throw new SlackRestServiceException("The channel must be specified.");
+        }
+        requestBodyJson.put("channel", channel);
+
+        if (StringUtils.isEmpty(message)) {
+            throw new SlackRestServiceException("No message to be sent with this record.");
+        }
+        requestBodyJson.put("text", message);
+
+
+        final InputStream requestBodyInputStream;
+        try {
+            requestBodyInputStream = IOUtils.toInputStream(

Review Comment:
   Recommend replacing `IOUtils.toInputStream()` with a simple method that returns `ByteArrayInputStream`. That would allow removing the dependency on `commons-io`.



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        if (StringUtils.isEmpty(channel)) {
+            throw new SlackRestServiceException("The channel must be specified.");
+        }
+        requestBodyJson.put("channel", channel);
+
+        if (StringUtils.isEmpty(message)) {
+            throw new SlackRestServiceException("No message to be sent with this record.");
+        }

Review Comment:
   Recommend removing this check and instead adding and check in the calling method.



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/TimestampDeserializer.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+
+import java.io.IOException;
+import java.util.Date;
+
+public class TimestampDeserializer extends JsonDeserializer<Date> {
+    @Override
+    public Date deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException {
+        final String timestampString = jp.getText().trim();
+        final Double milliseconds = Double.valueOf(timestampString) * 1000;
+
+        final Date timestamp = new Date();
+        timestamp.setTime(milliseconds.longValue());
+
+        System.out.println(timestamp.getTime());

Review Comment:
   The `println` needs to be removed.



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/Message.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import java.util.Date;
+
+public class Message {
+    private String text;
+    private String username;
+    @JsonProperty("first_name")
+    private String botId;
+    @JsonProperty("attachments")
+    private Attachment[] attachment;

Review Comment:
   Is there a reason for using this annotation as opposed to naming the field `attachments`?



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"slack", "record", "sink"})
+@CapabilityDescription("Format and send Records using Slack. This controller service uses Slack Web API methods to post " +
+        "messages to a specific channel. Before using SlackRecordSink, you need to create a Slack App, to add a Bot User " +
+        "to the app, and to install the app in your Slack workspace. After the app installed, you can get " +
+        "the Bot User's OAuth Bearer Token that will be needed to authenticate and authorize " +
+        "your SlackRecordSink controller service to Slack.")
+public class SlackRecordSink extends AbstractControllerService implements RecordSinkService {
+
+    private static final String SLACK_API_URL = "https://slack.com/api";
+
+    public static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("Slack Web API URL for posting text messages to channels." +
+                    " It only needs to be changed if Slack changes its API URL.")
+            .required(true)
+            .defaultValue(SLACK_API_URL)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder()
+            .name("access-token")
+            .displayName("Access Token")
+            .description("Bot OAuth Token used for authenticating/authorizing the Slack request sent by NiFi.")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder()
+            .name("channel-id")
+            .displayName("Channel ID")
+            .description("Slack channel, private group, or IM channel to send the message to. Use Channel ID instead of the name.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor WEB_SERVICE_CLIENT_PROVIDER = new PropertyDescriptor.Builder()
+            .name("web-service-client-provider")
+            .displayName("Web Service Client Provider")
+            .description("Controller service to provide HTTP client for communicating with Slack API")
+            .required(true)
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .build();
+    private volatile RecordSetWriterFactory writerFactory;
+    private SlackRestService service;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Collections.unmodifiableList(Arrays.asList(
+                API_URL,
+                ACCESS_TOKEN,
+                CHANNEL_ID,
+                RECORD_WRITER_FACTORY,
+                WEB_SERVICE_CLIENT_PROVIDER
+        ));
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+        final WebClientServiceProvider webClientServiceProvider = context
+                .getProperty(WEB_SERVICE_CLIENT_PROVIDER)
+                .asControllerService(WebClientServiceProvider.class);
+        final String accessToken = context.getProperty(ACCESS_TOKEN).getValue();
+        final String apiUrl = context.getProperty(API_URL).getValue();
+        service = new SlackRestService(webClientServiceProvider, accessToken, apiUrl);
+    }
+
+    @Override
+    public WriteResult sendData(final RecordSet recordSet, final Map<String, String> attributes, final boolean sendZeroResults) throws IOException {
+        WriteResult writeResult;
+        try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), recordSet.getSchema(), out, attributes)) {
+                writer.beginRecordSet();
+                Record record = recordSet.next();
+                while (record != null) {
+                    writer.write(record);
+                    writer.flush();
+                    record = recordSet.next();
+                }
+                writeResult = writer.finishRecordSet();
+                writer.flush();
+            } catch (final SchemaNotFoundException e) {
+                final String errorMessage = String.format("RecordSetWriter could not be created because the schema was not found. The schema name for the RecordSet to write is %s",
+                        recordSet.getSchema().getSchemaName());
+                throw new ProcessException(errorMessage, e);
+            }
+
+            try {
+                final String message = out.toString();
+                final String channel = getConfigurationContext().getProperty(CHANNEL_ID).getValue();
+                service.sendMessageToChannel(message, channel);
+            } catch (final SlackRestServiceException e) {
+                getLogger().error("Failed to send message to Slack.", e);
+                throw new ProcessException(e);

Review Comment:
   The logger can be removed and the message can be included in the Exception. Also recommend using the `IOException`.
   ```suggestion
                   throw new IOException("Failed to send messages to Slack", e);
   ```



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"slack", "record", "sink"})
+@CapabilityDescription("Format and send Records using Slack. This controller service uses Slack Web API methods to post " +
+        "messages to a specific channel. Before using SlackRecordSink, you need to create a Slack App, to add a Bot User " +
+        "to the app, and to install the app in your Slack workspace. After the app installed, you can get " +
+        "the Bot User's OAuth Bearer Token that will be needed to authenticate and authorize " +
+        "your SlackRecordSink controller service to Slack.")
+public class SlackRecordSink extends AbstractControllerService implements RecordSinkService {
+
+    private static final String SLACK_API_URL = "https://slack.com/api";
+
+    public static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("Slack Web API URL for posting text messages to channels." +
+                    " It only needs to be changed if Slack changes its API URL.")
+            .required(true)
+            .defaultValue(SLACK_API_URL)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder()
+            .name("access-token")
+            .displayName("Access Token")
+            .description("Bot OAuth Token used for authenticating/authorizing the Slack request sent by NiFi.")

Review Comment:
   ```suggestion
               .description("Bot OAuth Token used for authenticating and authorizing the request to Slack")
   ```



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        if (StringUtils.isEmpty(channel)) {
+            throw new SlackRestServiceException("The channel must be specified.");
+        }
+        requestBodyJson.put("channel", channel);
+
+        if (StringUtils.isEmpty(message)) {
+            throw new SlackRestServiceException("No message to be sent with this record.");
+        }
+        requestBodyJson.put("text", message);

Review Comment:
   Recommend moving the construction of the `ObjectNode` to a separate method.



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        if (StringUtils.isEmpty(channel)) {
+            throw new SlackRestServiceException("The channel must be specified.");
+        }
+        requestBodyJson.put("channel", channel);
+
+        if (StringUtils.isEmpty(message)) {
+            throw new SlackRestServiceException("No message to be sent with this record.");
+        }
+        requestBodyJson.put("text", message);
+
+
+        final InputStream requestBodyInputStream;
+        try {
+            requestBodyInputStream = IOUtils.toInputStream(
+                    objectMapper.writeValueAsString(requestBodyJson),
+                    StandardCharsets.UTF_8
+            );
+        } catch (final JsonProcessingException e) {
+            throw new SlackRestServiceException("JSON processing exception occurred", e);
+        }
+
+        try (final HttpResponseEntity response = webClientServiceProvider.getWebClientService()
+                .post()
+                .uri(uri)
+                .header("Authorization", String.format("Bearer %s", accessToken))
+                .header("Content-Type", "application/json")
+                .body(requestBodyInputStream, OptionalLong.of(requestBodyInputStream.available()))
+                .retrieve()) {
+            final int statusCode = response.statusCode();
+            if (!(statusCode >= 200 && statusCode < 300)) {
+                throw new SlackRestServiceException("HTTP error code: " + statusCode);
+            }
+
+            try {
+                final SlackPostMessageResponse slackResponse = objectMapper.readValue(response.body(), SlackPostMessageResponse.class);
+                logger.error(objectMapper.writeValueAsString(slackResponse));

Review Comment:
   This looks like it should be removed since it logs everything as an error.



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        if (StringUtils.isEmpty(channel)) {
+            throw new SlackRestServiceException("The channel must be specified.");
+        }
+        requestBodyJson.put("channel", channel);
+
+        if (StringUtils.isEmpty(message)) {
+            throw new SlackRestServiceException("No message to be sent with this record.");
+        }
+        requestBodyJson.put("text", message);
+
+
+        final InputStream requestBodyInputStream;
+        try {
+            requestBodyInputStream = IOUtils.toInputStream(
+                    objectMapper.writeValueAsString(requestBodyJson),
+                    StandardCharsets.UTF_8
+            );
+        } catch (final JsonProcessingException e) {
+            throw new SlackRestServiceException("JSON processing exception occurred", e);
+        }
+
+        try (final HttpResponseEntity response = webClientServiceProvider.getWebClientService()
+                .post()
+                .uri(uri)
+                .header("Authorization", String.format("Bearer %s", accessToken))
+                .header("Content-Type", "application/json")
+                .body(requestBodyInputStream, OptionalLong.of(requestBodyInputStream.available()))
+                .retrieve()) {
+            final int statusCode = response.statusCode();
+            if (!(statusCode >= 200 && statusCode < 300)) {
+                throw new SlackRestServiceException("HTTP error code: " + statusCode);
+            }
+
+            try {
+                final SlackPostMessageResponse slackResponse = objectMapper.readValue(response.body(), SlackPostMessageResponse.class);
+                logger.error(objectMapper.writeValueAsString(slackResponse));
+
+                slackResponse.checkResponse(logger);
+            } catch (final IOException e) {
+                throw new SlackRestServiceException("Slack response JSON cannot be parsed.", e);

Review Comment:
   Recommend removing the period character.
   ```suggestion
                   throw new SlackRestServiceException("JSON response parsing failed", e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] emiliosetiadarma commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "emiliosetiadarma (via GitHub)" <gi...@apache.org>.
emiliosetiadarma commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1128672161


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        if (StringUtils.isEmpty(channel)) {
+            throw new SlackRestServiceException("The channel must be specified.");
+        }
+        requestBodyJson.put("channel", channel);
+
+        if (StringUtils.isEmpty(message)) {
+            throw new SlackRestServiceException("No message to be sent with this record.");
+        }
+        requestBodyJson.put("text", message);
+
+
+        final InputStream requestBodyInputStream;
+        try {
+            requestBodyInputStream = IOUtils.toInputStream(
+                    objectMapper.writeValueAsString(requestBodyJson),
+                    StandardCharsets.UTF_8
+            );
+        } catch (final JsonProcessingException e) {
+            throw new SlackRestServiceException("JSON processing exception occurred", e);
+        }
+
+        try (final HttpResponseEntity response = webClientServiceProvider.getWebClientService()
+                .post()
+                .uri(uri)
+                .header("Authorization", String.format("Bearer %s", accessToken))
+                .header("Content-Type", "application/json")
+                .body(requestBodyInputStream, OptionalLong.of(requestBodyInputStream.available()))
+                .retrieve()) {
+            final int statusCode = response.statusCode();
+            if (!(statusCode >= 200 && statusCode < 300)) {
+                throw new SlackRestServiceException("HTTP error code: " + statusCode);
+            }
+
+            try {
+                final SlackPostMessageResponse slackResponse = objectMapper.readValue(response.body(), SlackPostMessageResponse.class);
+                logger.error(objectMapper.writeValueAsString(slackResponse));

Review Comment:
   Ah sorry, there should be no logging here, removing the logging



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] emiliosetiadarma commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "emiliosetiadarma (via GitHub)" <gi...@apache.org>.
emiliosetiadarma commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1128590745


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"slack", "record", "sink"})
+@CapabilityDescription("Format and send Records using Slack. This controller service uses Slack Web API methods to post " +
+        "messages to a specific channel. Before using SlackRecordSink, you need to create a Slack App, to add a Bot User " +
+        "to the app, and to install the app in your Slack workspace. After the app installed, you can get " +
+        "the Bot User's OAuth Bearer Token that will be needed to authenticate and authorize " +
+        "your SlackRecordSink controller service to Slack.")
+public class SlackRecordSink extends AbstractControllerService implements RecordSinkService {
+
+    private static final String SLACK_API_URL = "https://slack.com/api";
+
+    public static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("Slack Web API URL for posting text messages to channels." +
+                    " It only needs to be changed if Slack changes its API URL.")
+            .required(true)
+            .defaultValue(SLACK_API_URL)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder()
+            .name("access-token")
+            .displayName("Access Token")
+            .description("Bot OAuth Token used for authenticating/authorizing the Slack request sent by NiFi.")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder()
+            .name("channel-id")
+            .displayName("Channel ID")
+            .description("Slack channel, private group, or IM channel to send the message to. Use Channel ID instead of the name.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor WEB_SERVICE_CLIENT_PROVIDER = new PropertyDescriptor.Builder()
+            .name("web-service-client-provider")
+            .displayName("Web Service Client Provider")
+            .description("Controller service to provide HTTP client for communicating with Slack API")
+            .required(true)
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .build();
+    private volatile RecordSetWriterFactory writerFactory;
+    private SlackRestService service;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Collections.unmodifiableList(Arrays.asList(
+                API_URL,
+                ACCESS_TOKEN,
+                CHANNEL_ID,
+                RECORD_WRITER_FACTORY,
+                WEB_SERVICE_CLIENT_PROVIDER
+        ));
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+        final WebClientServiceProvider webClientServiceProvider = context
+                .getProperty(WEB_SERVICE_CLIENT_PROVIDER)
+                .asControllerService(WebClientServiceProvider.class);
+        final String accessToken = context.getProperty(ACCESS_TOKEN).getValue();
+        final String apiUrl = context.getProperty(API_URL).getValue();
+        service = new SlackRestService(webClientServiceProvider, accessToken, apiUrl);
+    }
+
+    @Override
+    public WriteResult sendData(final RecordSet recordSet, final Map<String, String> attributes, final boolean sendZeroResults) throws IOException {
+        WriteResult writeResult;
+        try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), recordSet.getSchema(), out, attributes)) {
+                writer.beginRecordSet();
+                Record record = recordSet.next();
+                while (record != null) {
+                    writer.write(record);
+                    writer.flush();
+                    record = recordSet.next();
+                }
+                writeResult = writer.finishRecordSet();
+                writer.flush();
+            } catch (final SchemaNotFoundException e) {
+                final String errorMessage = String.format("RecordSetWriter could not be created because the schema was not found. The schema name for the RecordSet to write is %s",
+                        recordSet.getSchema().getSchemaName());
+                throw new ProcessException(errorMessage, e);
+            }
+
+            try {
+                final String message = out.toString();
+                final String channel = getConfigurationContext().getProperty(CHANNEL_ID).getValue();
+                service.sendMessageToChannel(message, channel);
+            } catch (final SlackRestServiceException e) {
+                getLogger().error("Failed to send message to Slack.", e);
+                throw new ProcessException(e);

Review Comment:
   Making the changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] emiliosetiadarma commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "emiliosetiadarma (via GitHub)" <gi...@apache.org>.
emiliosetiadarma commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1128589770


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/Message.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import java.util.Date;
+
+public class Message {
+    private String text;
+    private String username;
+    @JsonProperty("first_name")

Review Comment:
   I wasn't sure what was on my mind when this happened.. will remove it



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"slack", "record", "sink"})
+@CapabilityDescription("Format and send Records using Slack. This controller service uses Slack Web API methods to post " +
+        "messages to a specific channel. Before using SlackRecordSink, you need to create a Slack App, to add a Bot User " +
+        "to the app, and to install the app in your Slack workspace. After the app installed, you can get " +
+        "the Bot User's OAuth Bearer Token that will be needed to authenticate and authorize " +
+        "your SlackRecordSink controller service to Slack.")
+public class SlackRecordSink extends AbstractControllerService implements RecordSinkService {
+
+    private static final String SLACK_API_URL = "https://slack.com/api";
+
+    public static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("Slack Web API URL for posting text messages to channels." +
+                    " It only needs to be changed if Slack changes its API URL.")
+            .required(true)
+            .defaultValue(SLACK_API_URL)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder()
+            .name("access-token")
+            .displayName("Access Token")
+            .description("Bot OAuth Token used for authenticating/authorizing the Slack request sent by NiFi.")

Review Comment:
   Changing it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on PR #6593:
URL: https://github.com/apache/nifi/pull/6593#issuecomment-1464293066

   Thanks for making the updates @emiliosetiadarma, I pushed one more commit to remove the Message and Attachment model classes, since they were not used in any of the service classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] emiliosetiadarma commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "emiliosetiadarma (via GitHub)" <gi...@apache.org>.
emiliosetiadarma commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1128584012


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/Message.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import java.util.Date;
+
+public class Message {
+    private String text;
+    private String username;
+    @JsonProperty("first_name")
+    private String botId;
+    @JsonProperty("attachments")
+    private Attachment[] attachment;

Review Comment:
   No particular reason, I'll change the name!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1127259164


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/TimestampDeserializer.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+
+import java.io.IOException;
+import java.util.Date;
+
+public class TimestampDeserializer extends JsonDeserializer<Date> {

Review Comment:
   In light of this being a custom class, recommend switching to use `java.time.Instant` instead of `java.util.Date` here and in the model classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1007509283


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml:
##########
@@ -85,5 +85,34 @@
             <artifactId>nifi-ssl-context-service-api</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-sink-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>

Review Comment:
   ```suggestion
               <scope>provided</scope>
   ```



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml:
##########
@@ -85,5 +85,34 @@
             <artifactId>nifi-ssl-context-service-api</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-sink-api</artifactId>
+            <scope>compile</scope>

Review Comment:
   All of the `api` dependencies should be marked as `provided`, instead of `compile`.
   ```suggestion
               <scope>provided</scope>
   ```



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml:
##########
@@ -85,5 +85,34 @@
             <artifactId>nifi-ssl-context-service-api</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-sink-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>

Review Comment:
   ```suggestion
               <scope>provided</scope>
   ```



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.util.EntityUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StringUtils;
+
+import javax.json.Json;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import javax.json.JsonString;
+import javax.json.stream.JsonParsingException;

Review Comment:
   The `javax.json` references should be replaced with Jackson.



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml:
##########
@@ -85,5 +85,34 @@
             <artifactId>nifi-ssl-context-service-api</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-sink-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>

Review Comment:
   ```suggestion
               <scope>provided</scope>
   ```



##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.util.EntityUtils;

Review Comment:
   Use of Apache HttpClient version 4 should be avoided for new components. This is an opportunity to make use of the new `WebServiceClientProvider` Controller Service to avoid direct dependencies on a particular HTTP implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] emiliosetiadarma commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "emiliosetiadarma (via GitHub)" <gi...@apache.org>.
emiliosetiadarma commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1128596992


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        if (StringUtils.isEmpty(channel)) {
+            throw new SlackRestServiceException("The channel must be specified.");
+        }
+        requestBodyJson.put("channel", channel);
+
+        if (StringUtils.isEmpty(message)) {
+            throw new SlackRestServiceException("No message to be sent with this record.");
+        }
+        requestBodyJson.put("text", message);

Review Comment:
   Moving to its own method called `createRequestBody`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] emiliosetiadarma commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "emiliosetiadarma (via GitHub)" <gi...@apache.org>.
emiliosetiadarma commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1128564534


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackPostMessageResponse.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.slf4j.Logger;
+
+import java.util.Date;
+
+public class SlackPostMessageResponse {
+    private Boolean ok;
+    private String channel;
+    @JsonDeserialize(using = TimestampDeserializer.class)
+    private Date ts;
+    private Message message;
+    private String error;
+    private String warning;
+
+    public Boolean isOk() {
+        return ok;
+    }
+
+    public void setOk(Boolean ok) {
+        this.ok = ok;
+    }
+
+    public String getChannel() {
+        return channel;
+    }
+
+    public void setChannel(String channel) {
+        this.channel = channel;
+    }
+
+    public Date getTs() {
+        return ts;
+    }
+
+    public void setTs(Date ts) {
+        this.ts = ts;
+    }
+
+    public Message getMessage() {
+        return message;
+    }
+
+    public void setMessage(Message message) {
+        this.message = message;
+    }
+
+    public String getError() {
+        return error;
+    }
+
+    public void setError(String error) {
+        this.error = error;
+    }
+
+    public String getWarning() {
+        return warning;
+    }
+
+    public void setWarning(String warning) {
+        this.warning = warning;
+    }
+
+    public void checkResponse(final Logger logger) throws SlackRestServiceException {
+        if (isOk() == null) {
+            throw new SlackRestServiceException("Slack response JSON does not contain 'ok' key or it has invalid value.");
+        }
+        if (!isOk()) {
+            throw new SlackRestServiceException("Slack error response: " + getError());
+        }
+
+        if (getWarning() != null) {
+            logger.warn("Slack warning message: " + getWarning());
+        }
+    }

Review Comment:
   Moving it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] emiliosetiadarma commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "emiliosetiadarma (via GitHub)" <gi...@apache.org>.
emiliosetiadarma commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1128771189


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final Logger logger;
+
+
+    public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.logger = LoggerFactory.getLogger(SlackRestService.class);
+    }
+
+    public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment("chat.postMessage");
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        if (StringUtils.isEmpty(channel)) {
+            throw new SlackRestServiceException("The channel must be specified.");
+        }
+        requestBodyJson.put("channel", channel);
+
+        if (StringUtils.isEmpty(message)) {
+            throw new SlackRestServiceException("No message to be sent with this record.");
+        }
+        requestBodyJson.put("text", message);

Review Comment:
   Made the changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] emiliosetiadarma commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "emiliosetiadarma (via GitHub)" <gi...@apache.org>.
emiliosetiadarma commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1128771448


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/Message.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import java.util.Date;
+
+public class Message {
+    private String text;
+    private String username;
+    @JsonProperty("first_name")
+    private String botId;
+    @JsonProperty("attachments")
+    private Attachment[] attachment;
+    private String type;
+    private String subtype;
+    @JsonDeserialize(using = TimestampDeserializer.class)

Review Comment:
   Changed to deserializing to the `Instant` type



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] emiliosetiadarma commented on a diff in pull request #6593: NIFI-8497: implemented SlackRecordSink and Test, wrote documentation …

Posted by "emiliosetiadarma (via GitHub)" <gi...@apache.org>.
emiliosetiadarma commented on code in PR #6593:
URL: https://github.com/apache/nifi/pull/6593#discussion_r1128672436


##########
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/TimestampDeserializer.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+
+import java.io.IOException;
+import java.util.Date;
+
+public class TimestampDeserializer extends JsonDeserializer<Date> {
+    @Override
+    public Date deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException {
+        final String timestampString = jp.getText().trim();
+        final Double milliseconds = Double.valueOf(timestampString) * 1000;
+
+        final Date timestamp = new Date();
+        timestamp.setTime(milliseconds.longValue());
+
+        System.out.println(timestamp.getTime());

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org