You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2017/01/30 14:04:29 UTC

nifi git commit: NIFI-3290 Reporting task to send bulletins with S2S

Repository: nifi
Updated Branches:
  refs/heads/master 89f1bd318 -> 78a0e1e18


NIFI-3290 Reporting task to send bulletins with S2S

This closes #1401


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/78a0e1e1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/78a0e1e1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/78a0e1e1

Branch: refs/heads/master
Commit: 78a0e1e18b9d40a10bac4612a9a0ebdc4b7d13bb
Parents: 89f1bd3
Author: Pierre Villard <pi...@gmail.com>
Authored: Thu Jan 5 23:08:55 2017 +0100
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Mon Jan 30 09:03:35 2017 -0500

----------------------------------------------------------------------
 .../SiteToSiteBulletinReportingTask.java        | 223 +++++++++++++++++++
 .../SiteToSiteProvenanceReportingTask.java      |   2 +-
 .../org.apache.nifi.reporting.ReportingTask     |   3 +-
 .../TestSiteToSiteBulletinReportingTask.java    | 199 +++++++++++++++++
 4 files changed, 425 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/78a0e1e1/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
new file mode 100644
index 0000000..9d9b1b7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.json.Json;
+import javax.json.JsonArray;
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+@Tags({"bulletin", "site", "site to site", "restricted"})
+@CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to "
+        + "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins "
+        + "may not be sent.")
+@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last bulletin ID so that on restart the task knows where it left off.")
+@Restricted("Provides operator the ability to send sensitive details contained in bulletin events to any external system.")
+@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
+public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {
+
+    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+    static final String LAST_EVENT_ID_KEY = "last_event_id";
+
+    static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
+        .name("Platform")
+        .description("The value to use for the platform field in each provenance event.")
+        .required(true)
+        .expressionLanguageSupported(true)
+        .defaultValue("nifi")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    private volatile long lastSentBulletinId = -1L;
+
+    static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    static {
+        descriptors.add(DESTINATION_URL);
+        descriptors.add(PORT_NAME);
+        descriptors.add(SSL_CONTEXT);
+        descriptors.add(COMPRESS);
+        descriptors.add(TIMEOUT);
+        descriptors.add(PLATFORM);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ReportingContext context) {
+
+        final boolean isClustered = context.isClustered();
+        final String nodeId = context.getClusterNodeIdentifier();
+        if (nodeId == null && isClustered) {
+            getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
+                + "Will wait for Node Identifier to be established.");
+            return;
+        }
+
+        if (lastSentBulletinId < 0) {
+            Map<String, String> state;
+            try {
+                state = context.getStateManager().getState(Scope.LOCAL).toMap();
+            } catch (IOException e) {
+                getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e);
+                return;
+            }
+            if (state.containsKey(LAST_EVENT_ID_KEY)) {
+                lastSentBulletinId = Long.parseLong(state.get(LAST_EVENT_ID_KEY));
+            }
+        }
+
+        final BulletinQuery bulletinQuery = new BulletinQuery.Builder().after(lastSentBulletinId).build();
+        final List<Bulletin> bulletins = context.getBulletinRepository().findBulletins(bulletinQuery);
+
+        if(bulletins == null || bulletins.isEmpty()) {
+            getLogger().debug("No events to send because no events are stored in the repository.");
+            return;
+        }
+
+        final OptionalLong opMaxId = bulletins.stream().mapToLong(t -> t.getId()).max();
+        final Long currMaxId = opMaxId.isPresent() ? opMaxId.getAsLong() : -1;
+
+        if(currMaxId < lastSentBulletinId){
+            getLogger().warn("Current bulletin max id is {} which is less than what was stored in state as the last queried event, which was {}. "
+                    + "This means the bulletins repository restarted its ids. Restarting querying from the beginning.", new Object[]{currMaxId, lastSentBulletinId});
+            lastSentBulletinId = -1;
+        }
+
+        if (currMaxId == lastSentBulletinId) {
+            getLogger().debug("No events to send due to the current max id being equal to the last id that was sent.");
+            return;
+        }
+
+        final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
+
+        final Map<String, ?> config = Collections.emptyMap();
+        final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+        final JsonObjectBuilder builder = factory.createObjectBuilder();
+
+        final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
+        df.setTimeZone(TimeZone.getTimeZone("Z"));
+
+        final long start = System.nanoTime();
+
+        // Create a JSON array of all the events in the current batch
+        final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
+        for (final Bulletin bulletin : bulletins) {
+            if(bulletin.getId() > lastSentBulletinId) {
+                arrayBuilder.add(serialize(factory, builder, bulletin, df, platform, nodeId));
+            }
+        }
+        final JsonArray jsonArray = arrayBuilder.build();
+
+        // Send the JSON document for the current batch
+        try {
+            final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
+            if (transaction == null) {
+                getLogger().info("All destination nodes are penalized; will attempt to send data later");
+                return;
+            }
+
+            final String transactionId = UUID.randomUUID().toString();
+            final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8);
+            transaction.send(data, Collections.singletonMap("reporting.task.transaction.id", transactionId));
+            transaction.confirm();
+            transaction.complete();
+
+            final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+            getLogger().info("Successfully sent {} Bulletins to destination in {} ms; Transaction ID = {}; First Event ID = {}",
+                    new Object[]{bulletins.size(), transferMillis, transactionId, bulletins.get(0).getId()});
+        } catch (final IOException e) {
+            throw new ProcessException("Failed to send Bulletins to destination due to IOException:" + e.getMessage(), e);
+        }
+
+        // Store the id of the last event so we know where we left off
+        try {
+            context.getStateManager().setState(Collections.singletonMap(LAST_EVENT_ID_KEY, String.valueOf(currMaxId)), Scope.LOCAL);
+        } catch (final IOException ioe) {
+            getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart.",
+                    new Object[]{currMaxId, ioe.getMessage()}, ioe);
+        }
+
+        lastSentBulletinId = currMaxId;
+    }
+
+    static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df,
+        final String platform, final String nodeIdentifier) {
+
+        addField(builder, "objectId", UUID.randomUUID().toString());
+        addField(builder, "platform", platform);
+        addField(builder, "bulletinId", bulletin.getId());
+        addField(builder, "bulletinCategory", bulletin.getCategory());
+        addField(builder, "bulletinGroupId", bulletin.getGroupId());
+        addField(builder, "bulletinLevel", bulletin.getLevel());
+        addField(builder, "bulletinMessage", bulletin.getMessage());
+        addField(builder, "bulletinNodeAddress", bulletin.getNodeAddress());
+        addField(builder, "bulletinNodeId", nodeIdentifier);
+        addField(builder, "bulletinSourceId", bulletin.getSourceId());
+        addField(builder, "bulletinSourceName", bulletin.getSourceName());
+        addField(builder, "bulletinSourceType", bulletin.getSourceType() == null ? null : bulletin.getSourceType().name());
+        addField(builder, "bulletinTimestamp", df.format(bulletin.getTimestamp()));
+
+        return builder.build();
+    }
+
+    private static void addField(final JsonObjectBuilder builder, final String key, final Long value) {
+        if (value != null) {
+            builder.add(key, value.longValue());
+        }
+    }
+
+    private static void addField(final JsonObjectBuilder builder, final String key, final String value) {
+        if (value == null) {
+            return;
+        }
+        builder.add(key, value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/78a0e1e1/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index 81a4ea4..2123e31 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -139,7 +139,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
             try {
                 state = context.getStateManager().getState(Scope.LOCAL).toMap();
             } catch (IOException e) {
-                getLogger().error("Failed to get state at start up due to {}:"+e.getMessage(), e);
+                getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e);
                 return;
             }
             if (state.containsKey(LAST_EVENT_ID_KEY)) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/78a0e1e1/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
index be9f654..bdf61cc 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
\ No newline at end of file
+org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
+org.apache.nifi.reporting.SiteToSiteBulletinReportingTask
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/78a0e1e1/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
new file mode 100644
index 0000000..d5bce1b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.json.Json;
+import javax.json.JsonObject;
+import javax.json.JsonReader;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockPropertyValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestSiteToSiteBulletinReportingTask {
+
+    @Test
+    public void testSerializedForm() throws IOException, InitializationException {
+        // creating the list of bulletins
+        final List<Bulletin> bulletins = new ArrayList<Bulletin>();
+        bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
+
+        // mock the access to the list of bulletins
+        final ReportingContext context = Mockito.mock(ReportingContext.class);
+        final BulletinRepository repository = Mockito.mock(BulletinRepository.class);
+        Mockito.when(context.getBulletinRepository()).thenReturn(repository);
+        Mockito.when(repository.findBulletins(Mockito.any(BulletinQuery.class))).thenReturn(bulletins);
+
+        // creating reporting task
+        final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask();
+        Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(task));
+
+        // settings properties and mocking access to properties
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000");
+        properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi");
+
+        Mockito.doAnswer(new Answer<PropertyValue>() {
+            @Override
+            public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
+                final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
+                return new MockPropertyValue(properties.get(descriptor));
+            }
+        }).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
+
+        // setup the mock initialization context
+        final ComponentLog logger = Mockito.mock(ComponentLog.class);
+        final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
+        Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+        Mockito.when(initContext.getLogger()).thenReturn(logger);
+
+        task.initialize(initContext);
+        task.onTrigger(context);
+
+        // test checking
+        assertEquals(1, task.dataSent.size());
+        final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
+        JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
+        JsonObject bulletinJson = jsonReader.readArray().getJsonObject(0);
+        assertEquals("message", bulletinJson.getString("bulletinMessage"));
+    }
+
+    @Test
+    public void testWhenProvenanceMaxIdEqualToLastEventIdInStateManager() throws IOException, InitializationException {
+        // creating the list of bulletins
+        final List<Bulletin> bulletins = new ArrayList<Bulletin>();
+        bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
+        bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
+        bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
+        bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
+
+        // mock the access to the list of bulletins
+        final ReportingContext context = Mockito.mock(ReportingContext.class);
+        final BulletinRepository repository = Mockito.mock(BulletinRepository.class);
+        Mockito.when(context.getBulletinRepository()).thenReturn(repository);
+        Mockito.when(repository.findBulletins(Mockito.any(BulletinQuery.class))).thenReturn(bulletins);
+
+        final long maxEventId = getMaxBulletinId(bulletins);;
+
+        // create the mock reporting task and mock state manager
+        final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask();
+        final MockStateManager stateManager = new MockStateManager(task);
+
+        // settings properties and mocking access to properties
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000");
+        properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi");
+
+        Mockito.doAnswer(new Answer<PropertyValue>() {
+            @Override
+            public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
+                final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
+                return new MockPropertyValue(properties.get(descriptor));
+            }
+        }).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
+
+        // create the state map and set the last id to the same value as maxEventId
+        final Map<String,String> state = new HashMap<>();
+        state.put(SiteToSiteProvenanceReportingTask.LAST_EVENT_ID_KEY, String.valueOf(maxEventId));
+        stateManager.setState(state, Scope.LOCAL);
+
+        // setup the mock reporting context to return the mock state manager
+        Mockito.when(context.getStateManager()).thenReturn(stateManager);
+
+        // setup the mock initialization context
+        final ComponentLog logger = Mockito.mock(ComponentLog.class);
+        final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
+        Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+        Mockito.when(initContext.getLogger()).thenReturn(logger);
+
+        task.initialize(initContext);
+
+        // execute the reporting task and should not produce any data b/c max id same as previous id
+        task.onTrigger(context);
+        assertEquals(0, task.dataSent.size());
+    }
+
+    private static final class MockSiteToSiteBulletinReportingTask extends SiteToSiteBulletinReportingTask {
+
+        final List<byte[]> dataSent = new ArrayList<>();
+
+        @Override
+        protected SiteToSiteClient getClient() {
+            final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
+            final Transaction transaction = Mockito.mock(Transaction.class);
+
+            try {
+                Mockito.doAnswer(new Answer<Object>() {
+                    @Override
+                    public Object answer(final InvocationOnMock invocation) throws Throwable {
+                        final byte[] data = invocation.getArgumentAt(0, byte[].class);
+                        dataSent.add(data);
+                        return null;
+                    }
+                }).when(transaction).send(Mockito.any(byte[].class), Mockito.anyMapOf(String.class, String.class));
+
+                Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
+            } catch (final Exception e) {
+                e.printStackTrace();
+                Assert.fail(e.toString());
+            }
+
+            return client;
+        }
+
+    }
+
+    private Long getMaxBulletinId(List<Bulletin> bulletins) {
+        long result = -1L;
+        for (Bulletin bulletin : bulletins) {
+            if(bulletin.getId() > result) {
+                result = bulletin.getId();
+            }
+        }
+        return result;
+    }
+
+}