You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by pvillard31 <gi...@git.apache.org> on 2017/01/05 23:13:19 UTC

[GitHub] nifi pull request #1401: NIFI-3290 Reporting task to send bulletins with S2S

GitHub user pvillard31 opened a pull request:

    https://github.com/apache/nifi/pull/1401

    NIFI-3290 Reporting task to send bulletins with S2S

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [X] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [X] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [X] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [X] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [X] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [X] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pvillard31/nifi bulletin-reporting-task

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/1401.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1401
    
----
commit c2b381f167cdd1f558a133bba118084be6d29262
Author: Pierre Villard <pi...@gmail.com>
Date:   2017-01-05T22:08:55Z

    NIFI-3290 Reporting task to send bulletins with S2S

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1401: NIFI-3290 Reporting task to send bulletins with S2S

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1401#discussion_r98357001
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java ---
    @@ -0,0 +1,235 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TimeZone;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonArray;
    +import javax.json.JsonArrayBuilder;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +import javax.json.JsonObjectBuilder;
    +
    +import org.apache.nifi.annotation.behavior.Restricted;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.configuration.DefaultSchedule;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.scheduling.SchedulingStrategy;
    +
    +@Tags({"bulletin", "site", "site to site", "restricted"})
    +@CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to "
    +        + "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins "
    +        + "may not be sent.")
    +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last bulletin ID so that on restart the task knows where it left off.")
    +@Restricted("Provides operator the ability to send sensitive details contained in bulletin events to any external system.")
    +@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
    +public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {
    +
    +    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    +    static final String LAST_EVENT_ID_KEY = "last_event_id";
    +
    +    static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
    +        .name("Platform")
    +        .description("The value to use for the platform field in each provenance event.")
    +        .required(true)
    +        .expressionLanguageSupported(true)
    +        .defaultValue("nifi")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private volatile long lastSentBulletinId = -1L;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DESTINATION_URL);
    +        properties.add(PORT_NAME);
    +        properties.add(SSL_CONTEXT);
    +        properties.add(COMPRESS);
    +        properties.add(TIMEOUT);
    +        properties.add(PLATFORM);
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ReportingContext context) {
    +
    +        final boolean isClustered = context.isClustered();
    +        final String nodeId = context.getClusterNodeIdentifier();
    +        if (nodeId == null && isClustered) {
    +            getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
    +                + "Will wait for Node Identifier to be established.");
    +            return;
    +        }
    +
    +        if (lastSentBulletinId < 0) {
    +            Map<String, String> state;
    +            try {
    +                state = context.getStateManager().getState(Scope.LOCAL).toMap();
    +            } catch (IOException e) {
    +                getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e);
    +                return;
    +            }
    +            if (state.containsKey(LAST_EVENT_ID_KEY)) {
    +                lastSentBulletinId = Long.parseLong(state.get(LAST_EVENT_ID_KEY));
    +            }
    +        }
    +
    +        final BulletinQuery bulletinQuery = new BulletinQuery.Builder().after(lastSentBulletinId).build();
    +        final List<Bulletin> bulletins = context.getBulletinRepository().findBulletins(bulletinQuery);
    +
    +        if(bulletins == null || bulletins.isEmpty()) {
    +            getLogger().debug("No events to send because no events are stored in the repository.");
    +            return;
    +        }
    +
    +        final Long currMaxId = getMaxBulletinId(bulletins);
    +
    +        if(currMaxId < lastSentBulletinId){
    +            getLogger().warn("Current bulletin max id is {} which is less than what was stored in state as the last queried event, which was {}. "
    +                    + "This means the bulletins repository restarted its ids. Restarting querying from the beginning.", new Object[]{currMaxId, lastSentBulletinId});
    +            lastSentBulletinId = -1;
    +        }
    +
    +        if (currMaxId == lastSentBulletinId) {
    +            getLogger().debug("No events to send due to the current max id being equal to the last id that was sent.");
    +            return;
    +        }
    +
    +        final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
    +
    +        final Map<String, ?> config = Collections.emptyMap();
    +        final JsonBuilderFactory factory = Json.createBuilderFactory(config);
    +        final JsonObjectBuilder builder = factory.createObjectBuilder();
    +
    +        final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
    +        df.setTimeZone(TimeZone.getTimeZone("Z"));
    +
    +        final long start = System.nanoTime();
    +
    +        // Create a JSON array of all the events in the current batch
    +        final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
    +        for (final Bulletin bulletin : bulletins) {
    +            if(bulletin.getId() > lastSentBulletinId) {
    +                arrayBuilder.add(serialize(factory, builder, bulletin, df, platform, nodeId));
    +            }
    +        }
    +        final JsonArray jsonArray = arrayBuilder.build();
    +
    +        // Send the JSON document for the current batch
    +        try {
    +            final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
    +            if (transaction == null) {
    +                getLogger().debug("All destination nodes are penalized; will attempt to send data later");
    +                return;
    +            }
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +            final String transactionId = UUID.randomUUID().toString();
    +            attributes.put("reporting.task.transaction.id", transactionId);
    +
    +            final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8);
    +            transaction.send(data, attributes);
    +            transaction.confirm();
    +            transaction.complete();
    +
    +            final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
    +            getLogger().info("Successfully sent {} Bulletins to destination in {} ms; Transaction ID = {}; First Event ID = {}",
    +                    new Object[]{bulletins.size(), transferMillis, transactionId, bulletins.get(0).getId()});
    +        } catch (final IOException e) {
    +            throw new ProcessException("Failed to send Bulletins to destination due to IOException:" + e.getMessage(), e);
    +        }
    +
    +        // Store the id of the last event so we know where we left off
    +        try {
    +            StateManager stateManager = context.getStateManager();
    +            Map<String, String> newMapOfState = new HashMap<>();
    +            newMapOfState.put(LAST_EVENT_ID_KEY, String.valueOf(currMaxId));
    +            stateManager.setState(newMapOfState, Scope.LOCAL);
    +        } catch (final IOException ioe) {
    +            getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart.",
    +                    new Object[]{currMaxId, ioe.getMessage()}, ioe);
    +        }
    +
    +        lastSentBulletinId = currMaxId;
    +    }
    +
    +    private Long getMaxBulletinId(List<Bulletin> bulletins) {
    +        long result = -1L;
    +        for (Bulletin bulletin : bulletins) {
    +            if(bulletin.getId() > result) {
    +                result = bulletin.getId();
    --- End diff --
    
    A bit confused. What if several _bulletin.getId()_ are greater then _result_ are you expecting the last one? And if so is there some predefined ordering?
    Alternatively I am thinking that you may want to break out of the loop once _result_ is assigned, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1401: NIFI-3290 Reporting task to send bulletins with S2S

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1401#discussion_r98363576
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java ---
    @@ -0,0 +1,235 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TimeZone;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonArray;
    +import javax.json.JsonArrayBuilder;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +import javax.json.JsonObjectBuilder;
    +
    +import org.apache.nifi.annotation.behavior.Restricted;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.configuration.DefaultSchedule;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.scheduling.SchedulingStrategy;
    +
    +@Tags({"bulletin", "site", "site to site", "restricted"})
    +@CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to "
    +        + "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins "
    +        + "may not be sent.")
    +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last bulletin ID so that on restart the task knows where it left off.")
    +@Restricted("Provides operator the ability to send sensitive details contained in bulletin events to any external system.")
    +@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
    +public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {
    +
    +    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    +    static final String LAST_EVENT_ID_KEY = "last_event_id";
    +
    +    static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
    +        .name("Platform")
    +        .description("The value to use for the platform field in each provenance event.")
    +        .required(true)
    +        .expressionLanguageSupported(true)
    +        .defaultValue("nifi")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private volatile long lastSentBulletinId = -1L;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DESTINATION_URL);
    +        properties.add(PORT_NAME);
    +        properties.add(SSL_CONTEXT);
    +        properties.add(COMPRESS);
    +        properties.add(TIMEOUT);
    +        properties.add(PLATFORM);
    +        return properties;
    +    }
    --- End diff --
    
    Yes you're right!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1401: NIFI-3290 Reporting task to send bulletins with S2S

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1401#discussion_r98356661
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java ---
    @@ -0,0 +1,235 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TimeZone;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonArray;
    +import javax.json.JsonArrayBuilder;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +import javax.json.JsonObjectBuilder;
    +
    +import org.apache.nifi.annotation.behavior.Restricted;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.configuration.DefaultSchedule;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.scheduling.SchedulingStrategy;
    +
    +@Tags({"bulletin", "site", "site to site", "restricted"})
    +@CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to "
    +        + "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins "
    +        + "may not be sent.")
    +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last bulletin ID so that on restart the task knows where it left off.")
    +@Restricted("Provides operator the ability to send sensitive details contained in bulletin events to any external system.")
    +@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
    +public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {
    +
    +    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    +    static final String LAST_EVENT_ID_KEY = "last_event_id";
    +
    +    static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
    +        .name("Platform")
    +        .description("The value to use for the platform field in each provenance event.")
    +        .required(true)
    +        .expressionLanguageSupported(true)
    +        .defaultValue("nifi")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private volatile long lastSentBulletinId = -1L;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DESTINATION_URL);
    +        properties.add(PORT_NAME);
    +        properties.add(SSL_CONTEXT);
    +        properties.add(COMPRESS);
    +        properties.add(TIMEOUT);
    +        properties.add(PLATFORM);
    +        return properties;
    +    }
    --- End diff --
    
    @pvillard31 given the fact that this operation is invoked multiple times consider applying the same pattern we use in Processors where property descriptor list (as well as relationships) are build in static initializers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1401: NIFI-3290 Reporting task to send bulletins with S2S

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1401#discussion_r98363608
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java ---
    @@ -0,0 +1,235 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TimeZone;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonArray;
    +import javax.json.JsonArrayBuilder;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +import javax.json.JsonObjectBuilder;
    +
    +import org.apache.nifi.annotation.behavior.Restricted;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.configuration.DefaultSchedule;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.scheduling.SchedulingStrategy;
    +
    +@Tags({"bulletin", "site", "site to site", "restricted"})
    +@CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to "
    +        + "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins "
    +        + "may not be sent.")
    +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last bulletin ID so that on restart the task knows where it left off.")
    +@Restricted("Provides operator the ability to send sensitive details contained in bulletin events to any external system.")
    +@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
    +public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {
    +
    +    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    +    static final String LAST_EVENT_ID_KEY = "last_event_id";
    +
    +    static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
    +        .name("Platform")
    +        .description("The value to use for the platform field in each provenance event.")
    +        .required(true)
    +        .expressionLanguageSupported(true)
    +        .defaultValue("nifi")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private volatile long lastSentBulletinId = -1L;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DESTINATION_URL);
    +        properties.add(PORT_NAME);
    +        properties.add(SSL_CONTEXT);
    +        properties.add(COMPRESS);
    +        properties.add(TIMEOUT);
    +        properties.add(PLATFORM);
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ReportingContext context) {
    +
    +        final boolean isClustered = context.isClustered();
    +        final String nodeId = context.getClusterNodeIdentifier();
    +        if (nodeId == null && isClustered) {
    +            getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
    +                + "Will wait for Node Identifier to be established.");
    +            return;
    +        }
    +
    +        if (lastSentBulletinId < 0) {
    +            Map<String, String> state;
    +            try {
    +                state = context.getStateManager().getState(Scope.LOCAL).toMap();
    +            } catch (IOException e) {
    +                getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e);
    +                return;
    +            }
    +            if (state.containsKey(LAST_EVENT_ID_KEY)) {
    +                lastSentBulletinId = Long.parseLong(state.get(LAST_EVENT_ID_KEY));
    +            }
    +        }
    +
    +        final BulletinQuery bulletinQuery = new BulletinQuery.Builder().after(lastSentBulletinId).build();
    +        final List<Bulletin> bulletins = context.getBulletinRepository().findBulletins(bulletinQuery);
    +
    +        if(bulletins == null || bulletins.isEmpty()) {
    +            getLogger().debug("No events to send because no events are stored in the repository.");
    +            return;
    +        }
    +
    +        final Long currMaxId = getMaxBulletinId(bulletins);
    +
    +        if(currMaxId < lastSentBulletinId){
    +            getLogger().warn("Current bulletin max id is {} which is less than what was stored in state as the last queried event, which was {}. "
    +                    + "This means the bulletins repository restarted its ids. Restarting querying from the beginning.", new Object[]{currMaxId, lastSentBulletinId});
    +            lastSentBulletinId = -1;
    +        }
    +
    +        if (currMaxId == lastSentBulletinId) {
    +            getLogger().debug("No events to send due to the current max id being equal to the last id that was sent.");
    +            return;
    +        }
    +
    +        final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
    +
    +        final Map<String, ?> config = Collections.emptyMap();
    +        final JsonBuilderFactory factory = Json.createBuilderFactory(config);
    +        final JsonObjectBuilder builder = factory.createObjectBuilder();
    +
    +        final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
    +        df.setTimeZone(TimeZone.getTimeZone("Z"));
    +
    +        final long start = System.nanoTime();
    +
    +        // Create a JSON array of all the events in the current batch
    +        final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
    +        for (final Bulletin bulletin : bulletins) {
    +            if(bulletin.getId() > lastSentBulletinId) {
    +                arrayBuilder.add(serialize(factory, builder, bulletin, df, platform, nodeId));
    +            }
    +        }
    +        final JsonArray jsonArray = arrayBuilder.build();
    +
    +        // Send the JSON document for the current batch
    +        try {
    +            final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
    +            if (transaction == null) {
    +                getLogger().debug("All destination nodes are penalized; will attempt to send data later");
    +                return;
    +            }
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +            final String transactionId = UUID.randomUUID().toString();
    +            attributes.put("reporting.task.transaction.id", transactionId);
    +
    +            final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8);
    +            transaction.send(data, attributes);
    +            transaction.confirm();
    +            transaction.complete();
    +
    +            final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
    +            getLogger().info("Successfully sent {} Bulletins to destination in {} ms; Transaction ID = {}; First Event ID = {}",
    +                    new Object[]{bulletins.size(), transferMillis, transactionId, bulletins.get(0).getId()});
    +        } catch (final IOException e) {
    +            throw new ProcessException("Failed to send Bulletins to destination due to IOException:" + e.getMessage(), e);
    +        }
    +
    +        // Store the id of the last event so we know where we left off
    +        try {
    +            StateManager stateManager = context.getStateManager();
    +            Map<String, String> newMapOfState = new HashMap<>();
    +            newMapOfState.put(LAST_EVENT_ID_KEY, String.valueOf(currMaxId));
    --- End diff --
    
    I don't imagine other values at the moment so I changed to a singleton map. That's more concise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1401: NIFI-3290 Reporting task to send bulletins with S2S

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/1401


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1401: NIFI-3290 Reporting task to send bulletins with S2S

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1401#discussion_r98356724
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java ---
    @@ -0,0 +1,235 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TimeZone;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonArray;
    +import javax.json.JsonArrayBuilder;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +import javax.json.JsonObjectBuilder;
    +
    +import org.apache.nifi.annotation.behavior.Restricted;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.configuration.DefaultSchedule;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.scheduling.SchedulingStrategy;
    +
    +@Tags({"bulletin", "site", "site to site", "restricted"})
    +@CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to "
    +        + "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins "
    +        + "may not be sent.")
    +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last bulletin ID so that on restart the task knows where it left off.")
    +@Restricted("Provides operator the ability to send sensitive details contained in bulletin events to any external system.")
    +@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
    +public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {
    +
    +    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    +    static final String LAST_EVENT_ID_KEY = "last_event_id";
    +
    +    static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
    +        .name("Platform")
    +        .description("The value to use for the platform field in each provenance event.")
    +        .required(true)
    +        .expressionLanguageSupported(true)
    +        .defaultValue("nifi")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private volatile long lastSentBulletinId = -1L;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DESTINATION_URL);
    +        properties.add(PORT_NAME);
    +        properties.add(SSL_CONTEXT);
    +        properties.add(COMPRESS);
    +        properties.add(TIMEOUT);
    +        properties.add(PLATFORM);
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ReportingContext context) {
    +
    +        final boolean isClustered = context.isClustered();
    +        final String nodeId = context.getClusterNodeIdentifier();
    +        if (nodeId == null && isClustered) {
    +            getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
    +                + "Will wait for Node Identifier to be established.");
    +            return;
    +        }
    +
    +        if (lastSentBulletinId < 0) {
    +            Map<String, String> state;
    +            try {
    +                state = context.getStateManager().getState(Scope.LOCAL).toMap();
    +            } catch (IOException e) {
    +                getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e);
    +                return;
    +            }
    +            if (state.containsKey(LAST_EVENT_ID_KEY)) {
    +                lastSentBulletinId = Long.parseLong(state.get(LAST_EVENT_ID_KEY));
    +            }
    +        }
    +
    +        final BulletinQuery bulletinQuery = new BulletinQuery.Builder().after(lastSentBulletinId).build();
    +        final List<Bulletin> bulletins = context.getBulletinRepository().findBulletins(bulletinQuery);
    +
    +        if(bulletins == null || bulletins.isEmpty()) {
    +            getLogger().debug("No events to send because no events are stored in the repository.");
    +            return;
    +        }
    +
    +        final Long currMaxId = getMaxBulletinId(bulletins);
    +
    +        if(currMaxId < lastSentBulletinId){
    +            getLogger().warn("Current bulletin max id is {} which is less than what was stored in state as the last queried event, which was {}. "
    +                    + "This means the bulletins repository restarted its ids. Restarting querying from the beginning.", new Object[]{currMaxId, lastSentBulletinId});
    +            lastSentBulletinId = -1;
    +        }
    +
    +        if (currMaxId == lastSentBulletinId) {
    +            getLogger().debug("No events to send due to the current max id being equal to the last id that was sent.");
    +            return;
    +        }
    +
    +        final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
    +
    +        final Map<String, ?> config = Collections.emptyMap();
    +        final JsonBuilderFactory factory = Json.createBuilderFactory(config);
    +        final JsonObjectBuilder builder = factory.createObjectBuilder();
    +
    +        final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
    +        df.setTimeZone(TimeZone.getTimeZone("Z"));
    +
    +        final long start = System.nanoTime();
    +
    +        // Create a JSON array of all the events in the current batch
    +        final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
    +        for (final Bulletin bulletin : bulletins) {
    +            if(bulletin.getId() > lastSentBulletinId) {
    +                arrayBuilder.add(serialize(factory, builder, bulletin, df, platform, nodeId));
    +            }
    +        }
    +        final JsonArray jsonArray = arrayBuilder.build();
    +
    +        // Send the JSON document for the current batch
    +        try {
    +            final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
    +            if (transaction == null) {
    +                getLogger().debug("All destination nodes are penalized; will attempt to send data later");
    --- End diff --
    
    Wondering if this should be INFO


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1401: NIFI-3290 Reporting task to send bulletins with S2S

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1401#discussion_r98363650
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java ---
    @@ -0,0 +1,235 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TimeZone;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonArray;
    +import javax.json.JsonArrayBuilder;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +import javax.json.JsonObjectBuilder;
    +
    +import org.apache.nifi.annotation.behavior.Restricted;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.configuration.DefaultSchedule;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.scheduling.SchedulingStrategy;
    +
    +@Tags({"bulletin", "site", "site to site", "restricted"})
    +@CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to "
    +        + "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins "
    +        + "may not be sent.")
    +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last bulletin ID so that on restart the task knows where it left off.")
    +@Restricted("Provides operator the ability to send sensitive details contained in bulletin events to any external system.")
    +@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
    +public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {
    +
    +    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    +    static final String LAST_EVENT_ID_KEY = "last_event_id";
    +
    +    static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
    +        .name("Platform")
    +        .description("The value to use for the platform field in each provenance event.")
    +        .required(true)
    +        .expressionLanguageSupported(true)
    +        .defaultValue("nifi")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private volatile long lastSentBulletinId = -1L;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DESTINATION_URL);
    +        properties.add(PORT_NAME);
    +        properties.add(SSL_CONTEXT);
    +        properties.add(COMPRESS);
    +        properties.add(TIMEOUT);
    +        properties.add(PLATFORM);
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ReportingContext context) {
    +
    +        final boolean isClustered = context.isClustered();
    +        final String nodeId = context.getClusterNodeIdentifier();
    +        if (nodeId == null && isClustered) {
    +            getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
    +                + "Will wait for Node Identifier to be established.");
    +            return;
    +        }
    +
    +        if (lastSentBulletinId < 0) {
    +            Map<String, String> state;
    +            try {
    +                state = context.getStateManager().getState(Scope.LOCAL).toMap();
    +            } catch (IOException e) {
    +                getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e);
    +                return;
    +            }
    +            if (state.containsKey(LAST_EVENT_ID_KEY)) {
    +                lastSentBulletinId = Long.parseLong(state.get(LAST_EVENT_ID_KEY));
    +            }
    +        }
    +
    +        final BulletinQuery bulletinQuery = new BulletinQuery.Builder().after(lastSentBulletinId).build();
    +        final List<Bulletin> bulletins = context.getBulletinRepository().findBulletins(bulletinQuery);
    +
    +        if(bulletins == null || bulletins.isEmpty()) {
    +            getLogger().debug("No events to send because no events are stored in the repository.");
    +            return;
    +        }
    +
    +        final Long currMaxId = getMaxBulletinId(bulletins);
    +
    +        if(currMaxId < lastSentBulletinId){
    +            getLogger().warn("Current bulletin max id is {} which is less than what was stored in state as the last queried event, which was {}. "
    +                    + "This means the bulletins repository restarted its ids. Restarting querying from the beginning.", new Object[]{currMaxId, lastSentBulletinId});
    +            lastSentBulletinId = -1;
    +        }
    +
    +        if (currMaxId == lastSentBulletinId) {
    +            getLogger().debug("No events to send due to the current max id being equal to the last id that was sent.");
    +            return;
    +        }
    +
    +        final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
    +
    +        final Map<String, ?> config = Collections.emptyMap();
    +        final JsonBuilderFactory factory = Json.createBuilderFactory(config);
    +        final JsonObjectBuilder builder = factory.createObjectBuilder();
    +
    +        final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
    +        df.setTimeZone(TimeZone.getTimeZone("Z"));
    +
    +        final long start = System.nanoTime();
    +
    +        // Create a JSON array of all the events in the current batch
    +        final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
    +        for (final Bulletin bulletin : bulletins) {
    +            if(bulletin.getId() > lastSentBulletinId) {
    +                arrayBuilder.add(serialize(factory, builder, bulletin, df, platform, nodeId));
    +            }
    +        }
    +        final JsonArray jsonArray = arrayBuilder.build();
    +
    +        // Send the JSON document for the current batch
    +        try {
    +            final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
    +            if (transaction == null) {
    +                getLogger().debug("All destination nodes are penalized; will attempt to send data later");
    +                return;
    +            }
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +            final String transactionId = UUID.randomUUID().toString();
    +            attributes.put("reporting.task.transaction.id", transactionId);
    +
    +            final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8);
    +            transaction.send(data, attributes);
    +            transaction.confirm();
    +            transaction.complete();
    +
    +            final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
    +            getLogger().info("Successfully sent {} Bulletins to destination in {} ms; Transaction ID = {}; First Event ID = {}",
    +                    new Object[]{bulletins.size(), transferMillis, transactionId, bulletins.get(0).getId()});
    +        } catch (final IOException e) {
    +            throw new ProcessException("Failed to send Bulletins to destination due to IOException:" + e.getMessage(), e);
    +        }
    +
    +        // Store the id of the last event so we know where we left off
    +        try {
    +            StateManager stateManager = context.getStateManager();
    +            Map<String, String> newMapOfState = new HashMap<>();
    +            newMapOfState.put(LAST_EVENT_ID_KEY, String.valueOf(currMaxId));
    +            stateManager.setState(newMapOfState, Scope.LOCAL);
    +        } catch (final IOException ioe) {
    +            getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart.",
    +                    new Object[]{currMaxId, ioe.getMessage()}, ioe);
    +        }
    +
    +        lastSentBulletinId = currMaxId;
    +    }
    +
    +    private Long getMaxBulletinId(List<Bulletin> bulletins) {
    +        long result = -1L;
    +        for (Bulletin bulletin : bulletins) {
    +            if(bulletin.getId() > result) {
    +                result = bulletin.getId();
    --- End diff --
    
    Hmmm, just trying to get the maximum ID from a list of bulletins. I do need to parse the whole list to get the maximum ID (unless I assume there is an order). I updated the PR with a more concise approach. Let me know if I missed something.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1401: NIFI-3290 Reporting task to send bulletins with S2S

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the issue:

    https://github.com/apache/nifi/pull/1401
  
    Reviewing. . .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1401: NIFI-3290 Reporting task to send bulletins with S2S

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1401#discussion_r98363588
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java ---
    @@ -0,0 +1,235 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TimeZone;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonArray;
    +import javax.json.JsonArrayBuilder;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +import javax.json.JsonObjectBuilder;
    +
    +import org.apache.nifi.annotation.behavior.Restricted;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.configuration.DefaultSchedule;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.scheduling.SchedulingStrategy;
    +
    +@Tags({"bulletin", "site", "site to site", "restricted"})
    +@CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to "
    +        + "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins "
    +        + "may not be sent.")
    +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last bulletin ID so that on restart the task knows where it left off.")
    +@Restricted("Provides operator the ability to send sensitive details contained in bulletin events to any external system.")
    +@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
    +public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {
    +
    +    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    +    static final String LAST_EVENT_ID_KEY = "last_event_id";
    +
    +    static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
    +        .name("Platform")
    +        .description("The value to use for the platform field in each provenance event.")
    +        .required(true)
    +        .expressionLanguageSupported(true)
    +        .defaultValue("nifi")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private volatile long lastSentBulletinId = -1L;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DESTINATION_URL);
    +        properties.add(PORT_NAME);
    +        properties.add(SSL_CONTEXT);
    +        properties.add(COMPRESS);
    +        properties.add(TIMEOUT);
    +        properties.add(PLATFORM);
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ReportingContext context) {
    +
    +        final boolean isClustered = context.isClustered();
    +        final String nodeId = context.getClusterNodeIdentifier();
    +        if (nodeId == null && isClustered) {
    +            getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
    +                + "Will wait for Node Identifier to be established.");
    +            return;
    +        }
    +
    +        if (lastSentBulletinId < 0) {
    +            Map<String, String> state;
    +            try {
    +                state = context.getStateManager().getState(Scope.LOCAL).toMap();
    +            } catch (IOException e) {
    +                getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e);
    +                return;
    +            }
    +            if (state.containsKey(LAST_EVENT_ID_KEY)) {
    +                lastSentBulletinId = Long.parseLong(state.get(LAST_EVENT_ID_KEY));
    +            }
    +        }
    +
    +        final BulletinQuery bulletinQuery = new BulletinQuery.Builder().after(lastSentBulletinId).build();
    +        final List<Bulletin> bulletins = context.getBulletinRepository().findBulletins(bulletinQuery);
    +
    +        if(bulletins == null || bulletins.isEmpty()) {
    +            getLogger().debug("No events to send because no events are stored in the repository.");
    +            return;
    +        }
    +
    +        final Long currMaxId = getMaxBulletinId(bulletins);
    +
    +        if(currMaxId < lastSentBulletinId){
    +            getLogger().warn("Current bulletin max id is {} which is less than what was stored in state as the last queried event, which was {}. "
    +                    + "This means the bulletins repository restarted its ids. Restarting querying from the beginning.", new Object[]{currMaxId, lastSentBulletinId});
    +            lastSentBulletinId = -1;
    +        }
    +
    +        if (currMaxId == lastSentBulletinId) {
    +            getLogger().debug("No events to send due to the current max id being equal to the last id that was sent.");
    +            return;
    +        }
    +
    +        final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
    +
    +        final Map<String, ?> config = Collections.emptyMap();
    +        final JsonBuilderFactory factory = Json.createBuilderFactory(config);
    +        final JsonObjectBuilder builder = factory.createObjectBuilder();
    +
    +        final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
    +        df.setTimeZone(TimeZone.getTimeZone("Z"));
    +
    +        final long start = System.nanoTime();
    +
    +        // Create a JSON array of all the events in the current batch
    +        final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
    +        for (final Bulletin bulletin : bulletins) {
    +            if(bulletin.getId() > lastSentBulletinId) {
    +                arrayBuilder.add(serialize(factory, builder, bulletin, df, platform, nodeId));
    +            }
    +        }
    +        final JsonArray jsonArray = arrayBuilder.build();
    +
    +        // Send the JSON document for the current batch
    +        try {
    +            final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
    +            if (transaction == null) {
    +                getLogger().debug("All destination nodes are penalized; will attempt to send data later");
    --- End diff --
    
    That's fair, I changed the level.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1401: NIFI-3290 Reporting task to send bulletins with S2S

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1401#discussion_r98356790
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java ---
    @@ -0,0 +1,235 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TimeZone;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonArray;
    +import javax.json.JsonArrayBuilder;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +import javax.json.JsonObjectBuilder;
    +
    +import org.apache.nifi.annotation.behavior.Restricted;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.configuration.DefaultSchedule;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.scheduling.SchedulingStrategy;
    +
    +@Tags({"bulletin", "site", "site to site", "restricted"})
    +@CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to "
    +        + "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins "
    +        + "may not be sent.")
    +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last bulletin ID so that on restart the task knows where it left off.")
    +@Restricted("Provides operator the ability to send sensitive details contained in bulletin events to any external system.")
    +@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
    +public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {
    +
    +    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    +    static final String LAST_EVENT_ID_KEY = "last_event_id";
    +
    +    static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
    +        .name("Platform")
    +        .description("The value to use for the platform field in each provenance event.")
    +        .required(true)
    +        .expressionLanguageSupported(true)
    +        .defaultValue("nifi")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private volatile long lastSentBulletinId = -1L;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DESTINATION_URL);
    +        properties.add(PORT_NAME);
    +        properties.add(SSL_CONTEXT);
    +        properties.add(COMPRESS);
    +        properties.add(TIMEOUT);
    +        properties.add(PLATFORM);
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ReportingContext context) {
    +
    +        final boolean isClustered = context.isClustered();
    +        final String nodeId = context.getClusterNodeIdentifier();
    +        if (nodeId == null && isClustered) {
    +            getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
    +                + "Will wait for Node Identifier to be established.");
    +            return;
    +        }
    +
    +        if (lastSentBulletinId < 0) {
    +            Map<String, String> state;
    +            try {
    +                state = context.getStateManager().getState(Scope.LOCAL).toMap();
    +            } catch (IOException e) {
    +                getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e);
    +                return;
    +            }
    +            if (state.containsKey(LAST_EVENT_ID_KEY)) {
    +                lastSentBulletinId = Long.parseLong(state.get(LAST_EVENT_ID_KEY));
    +            }
    +        }
    +
    +        final BulletinQuery bulletinQuery = new BulletinQuery.Builder().after(lastSentBulletinId).build();
    +        final List<Bulletin> bulletins = context.getBulletinRepository().findBulletins(bulletinQuery);
    +
    +        if(bulletins == null || bulletins.isEmpty()) {
    +            getLogger().debug("No events to send because no events are stored in the repository.");
    +            return;
    +        }
    +
    +        final Long currMaxId = getMaxBulletinId(bulletins);
    +
    +        if(currMaxId < lastSentBulletinId){
    +            getLogger().warn("Current bulletin max id is {} which is less than what was stored in state as the last queried event, which was {}. "
    +                    + "This means the bulletins repository restarted its ids. Restarting querying from the beginning.", new Object[]{currMaxId, lastSentBulletinId});
    +            lastSentBulletinId = -1;
    +        }
    +
    +        if (currMaxId == lastSentBulletinId) {
    +            getLogger().debug("No events to send due to the current max id being equal to the last id that was sent.");
    +            return;
    +        }
    +
    +        final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
    +
    +        final Map<String, ?> config = Collections.emptyMap();
    +        final JsonBuilderFactory factory = Json.createBuilderFactory(config);
    +        final JsonObjectBuilder builder = factory.createObjectBuilder();
    +
    +        final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
    +        df.setTimeZone(TimeZone.getTimeZone("Z"));
    +
    +        final long start = System.nanoTime();
    +
    +        // Create a JSON array of all the events in the current batch
    +        final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
    +        for (final Bulletin bulletin : bulletins) {
    +            if(bulletin.getId() > lastSentBulletinId) {
    +                arrayBuilder.add(serialize(factory, builder, bulletin, df, platform, nodeId));
    +            }
    +        }
    +        final JsonArray jsonArray = arrayBuilder.build();
    +
    +        // Send the JSON document for the current batch
    +        try {
    +            final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
    +            if (transaction == null) {
    +                getLogger().debug("All destination nodes are penalized; will attempt to send data later");
    +                return;
    +            }
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +            final String transactionId = UUID.randomUUID().toString();
    +            attributes.put("reporting.task.transaction.id", transactionId);
    +
    +            final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8);
    +            transaction.send(data, attributes);
    +            transaction.confirm();
    +            transaction.complete();
    +
    +            final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
    +            getLogger().info("Successfully sent {} Bulletins to destination in {} ms; Transaction ID = {}; First Event ID = {}",
    +                    new Object[]{bulletins.size(), transferMillis, transactionId, bulletins.get(0).getId()});
    +        } catch (final IOException e) {
    +            throw new ProcessException("Failed to send Bulletins to destination due to IOException:" + e.getMessage(), e);
    +        }
    +
    +        // Store the id of the last event so we know where we left off
    +        try {
    +            StateManager stateManager = context.getStateManager();
    +            Map<String, String> newMapOfState = new HashMap<>();
    +            newMapOfState.put(LAST_EVENT_ID_KEY, String.valueOf(currMaxId));
    --- End diff --
    
    More of a style issue, but I wonder if we should use ```Collections.singletonMap(key, value)``` or do you expect /know already that there may be more values?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---