You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2017/01/30 14:04:29 UTC
nifi git commit: NIFI-3290 Reporting task to send bulletins with S2S
Repository: nifi
Updated Branches:
refs/heads/master 89f1bd318 -> 78a0e1e18
NIFI-3290 Reporting task to send bulletins with S2S
This closes #1401
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/78a0e1e1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/78a0e1e1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/78a0e1e1
Branch: refs/heads/master
Commit: 78a0e1e18b9d40a10bac4612a9a0ebdc4b7d13bb
Parents: 89f1bd3
Author: Pierre Villard <pi...@gmail.com>
Authored: Thu Jan 5 23:08:55 2017 +0100
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Mon Jan 30 09:03:35 2017 -0500
----------------------------------------------------------------------
.../SiteToSiteBulletinReportingTask.java | 223 +++++++++++++++++++
.../SiteToSiteProvenanceReportingTask.java | 2 +-
.../org.apache.nifi.reporting.ReportingTask | 3 +-
.../TestSiteToSiteBulletinReportingTask.java | 199 +++++++++++++++++
4 files changed, 425 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/78a0e1e1/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
new file mode 100644
index 0000000..9d9b1b7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.json.Json;
+import javax.json.JsonArray;
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+@Tags({"bulletin", "site", "site to site", "restricted"})
+@CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to "
+ + "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins "
+ + "may not be sent.")
+@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last bulletin ID so that on restart the task knows where it left off.")
+@Restricted("Provides operator the ability to send sensitive details contained in bulletin events to any external system.")
+@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
+public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {
+
+ static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+ static final String LAST_EVENT_ID_KEY = "last_event_id";
+
+ static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
+ .name("Platform")
+ .description("The value to use for the platform field in each provenance event.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .defaultValue("nifi")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ private volatile long lastSentBulletinId = -1L;
+
+ static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+ static {
+ descriptors.add(DESTINATION_URL);
+ descriptors.add(PORT_NAME);
+ descriptors.add(SSL_CONTEXT);
+ descriptors.add(COMPRESS);
+ descriptors.add(TIMEOUT);
+ descriptors.add(PLATFORM);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ @Override
+ public void onTrigger(final ReportingContext context) {
+
+ final boolean isClustered = context.isClustered();
+ final String nodeId = context.getClusterNodeIdentifier();
+ if (nodeId == null && isClustered) {
+ getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
+ + "Will wait for Node Identifier to be established.");
+ return;
+ }
+
+ if (lastSentBulletinId < 0) {
+ Map<String, String> state;
+ try {
+ state = context.getStateManager().getState(Scope.LOCAL).toMap();
+ } catch (IOException e) {
+ getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e);
+ return;
+ }
+ if (state.containsKey(LAST_EVENT_ID_KEY)) {
+ lastSentBulletinId = Long.parseLong(state.get(LAST_EVENT_ID_KEY));
+ }
+ }
+
+ final BulletinQuery bulletinQuery = new BulletinQuery.Builder().after(lastSentBulletinId).build();
+ final List<Bulletin> bulletins = context.getBulletinRepository().findBulletins(bulletinQuery);
+
+ if(bulletins == null || bulletins.isEmpty()) {
+ getLogger().debug("No events to send because no events are stored in the repository.");
+ return;
+ }
+
+ final OptionalLong opMaxId = bulletins.stream().mapToLong(t -> t.getId()).max();
+ final Long currMaxId = opMaxId.isPresent() ? opMaxId.getAsLong() : -1;
+
+ if(currMaxId < lastSentBulletinId){
+ getLogger().warn("Current bulletin max id is {} which is less than what was stored in state as the last queried event, which was {}. "
+ + "This means the bulletins repository restarted its ids. Restarting querying from the beginning.", new Object[]{currMaxId, lastSentBulletinId});
+ lastSentBulletinId = -1;
+ }
+
+ if (currMaxId == lastSentBulletinId) {
+ getLogger().debug("No events to send due to the current max id being equal to the last id that was sent.");
+ return;
+ }
+
+ final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
+
+ final Map<String, ?> config = Collections.emptyMap();
+ final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+ final JsonObjectBuilder builder = factory.createObjectBuilder();
+
+ final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
+ df.setTimeZone(TimeZone.getTimeZone("Z"));
+
+ final long start = System.nanoTime();
+
+ // Create a JSON array of all the events in the current batch
+ final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
+ for (final Bulletin bulletin : bulletins) {
+ if(bulletin.getId() > lastSentBulletinId) {
+ arrayBuilder.add(serialize(factory, builder, bulletin, df, platform, nodeId));
+ }
+ }
+ final JsonArray jsonArray = arrayBuilder.build();
+
+ // Send the JSON document for the current batch
+ try {
+ final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
+ if (transaction == null) {
+ getLogger().info("All destination nodes are penalized; will attempt to send data later");
+ return;
+ }
+
+ final String transactionId = UUID.randomUUID().toString();
+ final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8);
+ transaction.send(data, Collections.singletonMap("reporting.task.transaction.id", transactionId));
+ transaction.confirm();
+ transaction.complete();
+
+ final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ getLogger().info("Successfully sent {} Bulletins to destination in {} ms; Transaction ID = {}; First Event ID = {}",
+ new Object[]{bulletins.size(), transferMillis, transactionId, bulletins.get(0).getId()});
+ } catch (final IOException e) {
+ throw new ProcessException("Failed to send Bulletins to destination due to IOException:" + e.getMessage(), e);
+ }
+
+ // Store the id of the last event so we know where we left off
+ try {
+ context.getStateManager().setState(Collections.singletonMap(LAST_EVENT_ID_KEY, String.valueOf(currMaxId)), Scope.LOCAL);
+ } catch (final IOException ioe) {
+ getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart.",
+ new Object[]{currMaxId, ioe.getMessage()}, ioe);
+ }
+
+ lastSentBulletinId = currMaxId;
+ }
+
+ static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df,
+ final String platform, final String nodeIdentifier) {
+
+ addField(builder, "objectId", UUID.randomUUID().toString());
+ addField(builder, "platform", platform);
+ addField(builder, "bulletinId", bulletin.getId());
+ addField(builder, "bulletinCategory", bulletin.getCategory());
+ addField(builder, "bulletinGroupId", bulletin.getGroupId());
+ addField(builder, "bulletinLevel", bulletin.getLevel());
+ addField(builder, "bulletinMessage", bulletin.getMessage());
+ addField(builder, "bulletinNodeAddress", bulletin.getNodeAddress());
+ addField(builder, "bulletinNodeId", nodeIdentifier);
+ addField(builder, "bulletinSourceId", bulletin.getSourceId());
+ addField(builder, "bulletinSourceName", bulletin.getSourceName());
+ addField(builder, "bulletinSourceType", bulletin.getSourceType() == null ? null : bulletin.getSourceType().name());
+ addField(builder, "bulletinTimestamp", df.format(bulletin.getTimestamp()));
+
+ return builder.build();
+ }
+
+ private static void addField(final JsonObjectBuilder builder, final String key, final Long value) {
+ if (value != null) {
+ builder.add(key, value.longValue());
+ }
+ }
+
+ private static void addField(final JsonObjectBuilder builder, final String key, final String value) {
+ if (value == null) {
+ return;
+ }
+ builder.add(key, value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/78a0e1e1/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index 81a4ea4..2123e31 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -139,7 +139,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
try {
state = context.getStateManager().getState(Scope.LOCAL).toMap();
} catch (IOException e) {
- getLogger().error("Failed to get state at start up due to {}:"+e.getMessage(), e);
+ getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e);
return;
}
if (state.containsKey(LAST_EVENT_ID_KEY)) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/78a0e1e1/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
index be9f654..bdf61cc 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
\ No newline at end of file
+org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
+org.apache.nifi.reporting.SiteToSiteBulletinReportingTask
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/78a0e1e1/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
new file mode 100644
index 0000000..d5bce1b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.json.Json;
+import javax.json.JsonObject;
+import javax.json.JsonReader;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockPropertyValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestSiteToSiteBulletinReportingTask {
+
+ @Test
+ public void testSerializedForm() throws IOException, InitializationException {
+ // creating the list of bulletins
+ final List<Bulletin> bulletins = new ArrayList<Bulletin>();
+ bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
+
+ // mock the access to the list of bulletins
+ final ReportingContext context = Mockito.mock(ReportingContext.class);
+ final BulletinRepository repository = Mockito.mock(BulletinRepository.class);
+ Mockito.when(context.getBulletinRepository()).thenReturn(repository);
+ Mockito.when(repository.findBulletins(Mockito.any(BulletinQuery.class))).thenReturn(bulletins);
+
+ // creating reporting task
+ final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask();
+ Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(task));
+
+ // settings properties and mocking access to properties
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
+ properties.put(descriptor, descriptor.getDefaultValue());
+ }
+ properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000");
+ properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi");
+
+ Mockito.doAnswer(new Answer<PropertyValue>() {
+ @Override
+ public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
+ final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
+ return new MockPropertyValue(properties.get(descriptor));
+ }
+ }).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
+
+ // setup the mock initialization context
+ final ComponentLog logger = Mockito.mock(ComponentLog.class);
+ final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
+ Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+ Mockito.when(initContext.getLogger()).thenReturn(logger);
+
+ task.initialize(initContext);
+ task.onTrigger(context);
+
+ // test checking
+ assertEquals(1, task.dataSent.size());
+ final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
+ JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
+ JsonObject bulletinJson = jsonReader.readArray().getJsonObject(0);
+ assertEquals("message", bulletinJson.getString("bulletinMessage"));
+ }
+
+ @Test
+ public void testWhenProvenanceMaxIdEqualToLastEventIdInStateManager() throws IOException, InitializationException {
+ // creating the list of bulletins
+ final List<Bulletin> bulletins = new ArrayList<Bulletin>();
+ bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
+ bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
+ bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
+ bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
+
+ // mock the access to the list of bulletins
+ final ReportingContext context = Mockito.mock(ReportingContext.class);
+ final BulletinRepository repository = Mockito.mock(BulletinRepository.class);
+ Mockito.when(context.getBulletinRepository()).thenReturn(repository);
+ Mockito.when(repository.findBulletins(Mockito.any(BulletinQuery.class))).thenReturn(bulletins);
+
+ final long maxEventId = getMaxBulletinId(bulletins);;
+
+ // create the mock reporting task and mock state manager
+ final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask();
+ final MockStateManager stateManager = new MockStateManager(task);
+
+ // settings properties and mocking access to properties
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
+ properties.put(descriptor, descriptor.getDefaultValue());
+ }
+ properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000");
+ properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi");
+
+ Mockito.doAnswer(new Answer<PropertyValue>() {
+ @Override
+ public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
+ final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
+ return new MockPropertyValue(properties.get(descriptor));
+ }
+ }).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
+
+ // create the state map and set the last id to the same value as maxEventId
+ final Map<String,String> state = new HashMap<>();
+ state.put(SiteToSiteProvenanceReportingTask.LAST_EVENT_ID_KEY, String.valueOf(maxEventId));
+ stateManager.setState(state, Scope.LOCAL);
+
+ // setup the mock reporting context to return the mock state manager
+ Mockito.when(context.getStateManager()).thenReturn(stateManager);
+
+ // setup the mock initialization context
+ final ComponentLog logger = Mockito.mock(ComponentLog.class);
+ final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
+ Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+ Mockito.when(initContext.getLogger()).thenReturn(logger);
+
+ task.initialize(initContext);
+
+ // execute the reporting task and should not produce any data b/c max id same as previous id
+ task.onTrigger(context);
+ assertEquals(0, task.dataSent.size());
+ }
+
+ private static final class MockSiteToSiteBulletinReportingTask extends SiteToSiteBulletinReportingTask {
+
+ final List<byte[]> dataSent = new ArrayList<>();
+
+ @Override
+ protected SiteToSiteClient getClient() {
+ final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
+ final Transaction transaction = Mockito.mock(Transaction.class);
+
+ try {
+ Mockito.doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable {
+ final byte[] data = invocation.getArgumentAt(0, byte[].class);
+ dataSent.add(data);
+ return null;
+ }
+ }).when(transaction).send(Mockito.any(byte[].class), Mockito.anyMapOf(String.class, String.class));
+
+ Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.toString());
+ }
+
+ return client;
+ }
+
+ }
+
+ private Long getMaxBulletinId(List<Bulletin> bulletins) {
+ long result = -1L;
+ for (Bulletin bulletin : bulletins) {
+ if(bulletin.getId() > result) {
+ result = bulletin.getId();
+ }
+ }
+ return result;
+ }
+
+}