You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/05/25 09:20:55 UTC
nifi git commit: NIFI-3791 - added back pressure data into
S2SStatusReportingTask
Repository: nifi
Updated Branches:
refs/heads/master c07850aec -> dc5e03236
NIFI-3791 - added back pressure data into S2SStatusReportingTask
This closes #1745.
Signed-off-by: Koji Kawamura <ij...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/dc5e0323
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/dc5e0323
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/dc5e0323
Branch: refs/heads/master
Commit: dc5e032368f09681bde680fc467512ea3f28dfa3
Parents: c07850a
Author: Pierre Villard <pi...@gmail.com>
Authored: Wed May 3 22:56:22 2017 +0200
Committer: Koji Kawamura <ij...@apache.org>
Committed: Thu May 25 18:20:11 2017 +0900
----------------------------------------------------------------------
.../reporting/SiteToSiteStatusReportingTask.java | 8 ++++++--
.../TestSiteToSiteStatusReportingTask.java | 18 ++++++++++++++++++
2 files changed, 24 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/dc5e0323/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
index d5dace2..c3d3da8 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
@@ -64,7 +64,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
.name("Platform")
- .description("The value to use for the platform field in each provenance event.")
+ .description("The value to use for the platform field in each status record.")
.required(true)
.expressionLanguageSupported(true)
.defaultValue("nifi")
@@ -179,7 +179,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
toIndex = Math.min(fromIndex + batchSize, jsonArray.size());
jsonBatch = jsonArray.subList(fromIndex, toIndex);
} catch (final IOException e) {
- throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e);
+ throw new ProcessException("Failed to send Status Records to destination due to IOException:" + e.getMessage(), e);
}
}
}
@@ -343,6 +343,10 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
addField(builder, "inputCount", status.getInputCount());
addField(builder, "outputBytes", status.getOutputBytes());
addField(builder, "outputCount", status.getOutputCount());
+ addField(builder, "backPressureBytesThreshold", status.getBackPressureBytesThreshold());
+ addField(builder, "backPressureObjectThreshold", status.getBackPressureObjectThreshold());
+ addField(builder, "isBackPressureEnabled", Boolean.toString((status.getBackPressureObjectThreshold() > 0 && status.getBackPressureObjectThreshold() <= status.getQueuedCount())
+ || (status.getBackPressureBytesThreshold() > 0 && status.getBackPressureBytesThreshold() <= status.getMaxQueuedBytes())));
arrayBuilder.add(builder.build());
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/dc5e0323/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
index 3c737d1..443981c 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
@@ -131,6 +131,24 @@ public class TestSiteToSiteStatusReportingTask {
}
@Test
+ public void testConnectionStatus() throws IOException, InitializationException {
+ final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
+
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
+ properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
+ properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(Connection)");
+
+ MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
+ task.onTrigger(context);
+
+ final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
+ JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
+ JsonString backpressure = jsonReader.readArray().getJsonObject(0).getJsonString("isBackPressureEnabled");
+ assertEquals("true", backpressure.getString());
+ }
+
+ @Test
public void testComponentNameFilter() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);