You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/04/11 19:01:18 UTC
[2/2] nifi git commit: NIFI-4809 - Implement a
SiteToSiteMetricsReportingTask
NIFI-4809 - Implement a SiteToSiteMetricsReportingTask
Fixed dependency issue by providing a local JSON reader
Rebased + fixed conflict + updated versions in pom + EL scope
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2575
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6fbe1515
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6fbe1515
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6fbe1515
Branch: refs/heads/master
Commit: 6fbe1515eefd2071dc75a1de2c1fc15cc282da76
Parents: ce0855e
Author: Pierre Villard <pi...@gmail.com>
Authored: Tue Jan 23 23:15:18 2018 +0100
Committer: Matthew Burgess <ma...@apache.org>
Committed: Wed Apr 11 14:44:30 2018 -0400
----------------------------------------------------------------------
.../nifi-ambari-reporting-task/pom.xml | 15 +-
.../reporting/ambari/AmbariReportingTask.java | 4 +-
.../reporting/ambari/api/MetricBuilder.java | 84 ----
.../nifi/reporting/ambari/api/MetricFields.java | 29 --
.../reporting/ambari/api/MetricsBuilder.java | 93 ----
.../reporting/ambari/metrics/MetricNames.java | 55 ---
.../ambari/metrics/MetricsService.java | 131 ------
.../ambari/api/TestMetricsBuilder.java | 2 +
.../ambari/metrics/TestMetricsService.java | 2 +
.../nifi-reporting-utils/pom.xml | 10 +
.../reporting/util/metrics/MetricNames.java | 59 +++
.../reporting/util/metrics/MetricsService.java | 230 ++++++++++
.../util/metrics/api/MetricBuilder.java | 84 ++++
.../util/metrics/api/MetricFields.java | 29 ++
.../util/metrics/api/MetricsBuilder.java | 93 ++++
.../nifi-site-to-site-reporting-task/pom.xml | 39 +-
.../AbstractSiteToSiteReportingTask.java | 420 ++++++++++++++++++-
.../SiteToSiteBulletinReportingTask.java | 18 +-
.../SiteToSiteMetricsReportingTask.java | 222 ++++++++++
.../SiteToSiteProvenanceReportingTask.java | 28 +-
.../SiteToSiteStatusReportingTask.java | 37 +-
.../org.apache.nifi.reporting.ReportingTask | 3 +-
.../additionalDetails.html | 178 ++++++++
.../additionalDetails.html | 2 +-
.../src/main/resources/schema-metrics.avsc | 37 ++
.../TestSiteToSiteMetricsReportingTask.java | 296 +++++++++++++
26 files changed, 1715 insertions(+), 485 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
index dafe829..de024e2 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
@@ -30,21 +30,11 @@
<artifactId>jersey-client</artifactId>
</dependency>
<dependency>
- <groupId>org.glassfish</groupId>
- <artifactId>javax.json</artifactId>
- <version>1.0.4</version>
- </dependency>
- <dependency>
<groupId>javax.json</groupId>
<artifactId>javax.json-api</artifactId>
<version>1.0</version>
</dependency>
<dependency>
- <groupId>com.yammer.metrics</groupId>
- <artifactId>metrics-core</artifactId>
- <version>2.2.0</version>
- </dependency>
- <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
@@ -53,6 +43,11 @@
<artifactId>nifi-utils</artifactId>
<version>1.7.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-reporting-utils</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ </dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
index 5bbdecb..0568b3e 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
@@ -29,8 +29,8 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
-import org.apache.nifi.reporting.ambari.api.MetricsBuilder;
-import org.apache.nifi.reporting.ambari.metrics.MetricsService;
+import org.apache.nifi.reporting.util.metrics.MetricsService;
+import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
import org.apache.nifi.scheduling.SchedulingStrategy;
import javax.json.Json;
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java
deleted file mode 100644
index 8e234ce..0000000
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.api;
-
-import javax.json.JsonBuilderFactory;
-import javax.json.JsonObject;
-
-/**
- * Builds the JsonObject for an individual metric.
- */
-public class MetricBuilder {
-
- private final JsonBuilderFactory factory;
-
- private String applicationId;
- private String instanceId;
- private String hostname;
- private String timestamp;
- private String metricName;
- private String metricValue;
-
- public MetricBuilder(final JsonBuilderFactory factory) {
- this.factory = factory;
- }
-
- public MetricBuilder applicationId(final String applicationId) {
- this.applicationId = applicationId;
- return this;
- }
-
- public MetricBuilder instanceId(final String instanceId) {
- this.instanceId = instanceId;
- return this;
- }
-
- public MetricBuilder hostname(final String hostname) {
- this.hostname = hostname;
- return this;
- }
-
- public MetricBuilder timestamp(final long timestamp) {
- this.timestamp = String.valueOf(timestamp);
- return this;
- }
-
- public MetricBuilder metricName(final String metricName) {
- this.metricName = metricName;
- return this;
- }
-
- public MetricBuilder metricValue(final String metricValue) {
- this.metricValue = metricValue;
- return this;
- }
-
- public JsonObject build() {
- return factory.createObjectBuilder()
- .add(MetricFields.METRIC_NAME, metricName)
- .add(MetricFields.APP_ID, applicationId)
- .add(MetricFields.INSTANCE_ID, instanceId)
- .add(MetricFields.HOSTNAME, hostname)
- .add(MetricFields.TIMESTAMP, timestamp)
- .add(MetricFields.START_TIME, timestamp)
- .add(MetricFields.METRICS,
- factory.createObjectBuilder()
- .add(String.valueOf(timestamp), metricValue)
- ).build();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java
deleted file mode 100644
index 1c1629c..0000000
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.api;
-
-public interface MetricFields {
-
- String METRIC_NAME = "metricname";
- String APP_ID = "appid";
- String INSTANCE_ID = "instanceid";
- String HOSTNAME = "hostname";
- String TIMESTAMP = "timestamp";
- String START_TIME = "starttime";
- String METRICS = "metrics";
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java
deleted file mode 100644
index 11b4db5..0000000
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.api;
-
-import javax.json.JsonArrayBuilder;
-import javax.json.JsonBuilderFactory;
-import javax.json.JsonObject;
-import javax.json.JsonObjectBuilder;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Builds the overall JsonObject for the Metrics.
- */
-public class MetricsBuilder {
-
- static final String ROOT_JSON_ELEMENT = "metrics";
-
- private final JsonBuilderFactory factory;
-
- private long timestamp;
- private String applicationId;
- private String instanceId;
- private String hostname;
- private Map<String,String> metrics = new HashMap<>();
-
- public MetricsBuilder(final JsonBuilderFactory factory) {
- this.factory = factory;
- }
-
- public MetricsBuilder applicationId(final String applicationId) {
- this.applicationId = applicationId;
- return this;
- }
-
- public MetricsBuilder instanceId(final String instanceId) {
- this.instanceId = instanceId;
- return this;
- }
-
- public MetricsBuilder hostname(final String hostname) {
- this.hostname = hostname;
- return this;
- }
-
- public MetricsBuilder timestamp(final long timestamp) {
- this.timestamp = timestamp;
- return this;
- }
-
- public MetricsBuilder metric(final String name, String value) {
- this.metrics.put(name, value);
- return this;
- }
-
- public MetricsBuilder addAllMetrics(final Map<String,String> metrics) {
- this.metrics.putAll(metrics);
- return this;
- }
-
- public JsonObject build() {
- // builds JsonObject for individual metrics
- final MetricBuilder metricBuilder = new MetricBuilder(factory);
- metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname);
-
- final JsonArrayBuilder metricArrayBuilder = factory.createArrayBuilder();
-
- for (Map.Entry<String,String> entry : metrics.entrySet()) {
- metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue());
- metricArrayBuilder.add(metricBuilder.build());
- }
-
- // add the array of metrics to a top-level json object
- final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder();
- metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder);
- return metricsBuilder.build();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
deleted file mode 100644
index 20cfa4e..0000000
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.metrics;
-
-/**
- * The Metric names to send to Ambari.
- */
-public interface MetricNames {
-
- // Metric Name separator
- String METRIC_NAME_SEPARATOR = ".";
-
- // NiFi Metrics
- String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
- String BYTES_RECEIVED = "BytesReceivedLast5Minutes";
- String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes";
- String BYTES_SENT = "BytesSentLast5Minutes";
- String FLOW_FILES_QUEUED = "FlowFilesQueued";
- String BYTES_QUEUED = "BytesQueued";
- String BYTES_READ = "BytesReadLast5Minutes";
- String BYTES_WRITTEN = "BytesWrittenLast5Minutes";
- String ACTIVE_THREADS = "ActiveThreads";
- String TOTAL_TASK_DURATION_SECONDS = "TotalTaskDurationSeconds";
- String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds";
-
- // JVM Metrics
- String JVM_UPTIME = "jvm.uptime";
- String JVM_HEAP_USED = "jvm.heap_used";
- String JVM_HEAP_USAGE = "jvm.heap_usage";
- String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage";
- String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable";
- String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked";
- String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting";
- String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated";
- String JVM_THREAD_COUNT = "jvm.thread_count";
- String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count";
- String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage";
- String JVM_GC_RUNS = "jvm.gc.runs";
- String JVM_GC_TIME = "jvm.gc.time";
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
deleted file mode 100644
index cef257d..0000000
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.metrics;
-
-import com.yammer.metrics.core.VirtualMachineMetrics;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.ProcessorStatus;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A service used to produce key/value metrics based on a given input.
- */
-public class MetricsService {
-
- /**
- * Generates a Map of metrics for a ProcessGroupStatus instance.
- *
- * @param status a ProcessGroupStatus to get metrics from
- * @param appendPgId if true, the process group ID will be appended at the end of the metric name
- * @return a map of metrics for the given status
- */
- public Map<String,String> getMetrics(ProcessGroupStatus status, boolean appendPgId) {
- final Map<String,String> metrics = new HashMap<>();
- metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), String.valueOf(status.getFlowFilesReceived()));
- metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), String.valueOf(status.getBytesReceived()));
- metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), String.valueOf(status.getFlowFilesSent()));
- metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), String.valueOf(status.getBytesSent()));
- metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedCount()));
- metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedContentSize()));
- metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), String.valueOf(status.getBytesRead()));
- metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), String.valueOf(status.getBytesWritten()));
- metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), String.valueOf(status.getActiveThreadCount()));
-
- final long durationNanos = calculateProcessingNanos(status);
- metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), String.valueOf(durationNanos));
-
- final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
- metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), String.valueOf(durationSeconds));
-
- return metrics;
- }
-
- /**
- * Generates a Map of metrics for VirtualMachineMetrics.
- *
- * @param virtualMachineMetrics a VirtualMachineMetrics instance to get metrics from
- * @return a map of metrics from the given VirtualMachineStatus
- */
- public Map<String,String> getMetrics(VirtualMachineMetrics virtualMachineMetrics) {
- final Map<String,String> metrics = new HashMap<>();
- metrics.put(MetricNames.JVM_UPTIME, String.valueOf(virtualMachineMetrics.uptime()));
- metrics.put(MetricNames.JVM_HEAP_USED, String.valueOf(virtualMachineMetrics.heapUsed()));
- metrics.put(MetricNames.JVM_HEAP_USAGE, String.valueOf(virtualMachineMetrics.heapUsage()));
- metrics.put(MetricNames.JVM_NON_HEAP_USAGE, String.valueOf(virtualMachineMetrics.nonHeapUsage()));
- metrics.put(MetricNames.JVM_THREAD_COUNT, String.valueOf(virtualMachineMetrics.threadCount()));
- metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, String.valueOf(virtualMachineMetrics.daemonThreadCount()));
- metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, String.valueOf(virtualMachineMetrics.fileDescriptorUsage()));
-
- for (Map.Entry<Thread.State,Double> entry : virtualMachineMetrics.threadStatePercentages().entrySet()) {
- final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue()));
- switch(entry.getKey()) {
- case BLOCKED:
- metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, String.valueOf(normalizedValue));
- break;
- case RUNNABLE:
- metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, String.valueOf(normalizedValue));
- break;
- case TERMINATED:
- metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, String.valueOf(normalizedValue));
- break;
- case TIMED_WAITING:
- metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, String.valueOf(normalizedValue));
- break;
- default:
- break;
- }
- }
-
- for (Map.Entry<String,VirtualMachineMetrics.GarbageCollectorStats> entry : virtualMachineMetrics.garbageCollectors().entrySet()) {
- final String gcName = entry.getKey().replace(" ", "");
- final long runs = entry.getValue().getRuns();
- final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS);
- metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, String.valueOf(runs));
- metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, String.valueOf(timeMS));
- }
-
- return metrics;
- }
-
- // calculates the total processing time of all processors in nanos
- protected long calculateProcessingNanos(final ProcessGroupStatus status) {
- long nanos = 0L;
-
- for (final ProcessorStatus procStats : status.getProcessorStatus()) {
- nanos += procStats.getProcessingNanos();
- }
-
- for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
- nanos += calculateProcessingNanos(childGroupStatus);
- }
-
- return nanos;
- }
-
- // append the process group ID if necessary
- private String appendPgId(String name, ProcessGroupStatus status, boolean appendPgId) {
- if(appendPgId) {
- return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId();
- } else {
- return name;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
index cdaa453..9b96eb9 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.reporting.ambari.api;
+import org.apache.nifi.reporting.util.metrics.api.MetricFields;
+import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
index 93224eb..ec0cf6e 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
@@ -19,6 +19,8 @@ package org.apache.nifi.reporting.ambari.metrics;
import com.yammer.metrics.core.VirtualMachineMetrics;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.reporting.util.metrics.MetricNames;
+import org.apache.nifi.reporting.util.metrics.MetricsService;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
index ba10afb..3e2d158 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
@@ -40,6 +40,16 @@
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
+ <dependency>
+ <groupId>com.yammer.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>2.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.json</artifactId>
+ <version>1.0.4</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java
new file mode 100644
index 0000000..19bb90d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics;
+
+/**
+ * The Metric names to send to Ambari.
+ */
+public interface MetricNames {
+
+ // Metric Name separator
+ String METRIC_NAME_SEPARATOR = ".";
+
+ // NiFi Metrics
+ String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
+ String BYTES_RECEIVED = "BytesReceivedLast5Minutes";
+ String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes";
+ String BYTES_SENT = "BytesSentLast5Minutes";
+ String FLOW_FILES_QUEUED = "FlowFilesQueued";
+ String BYTES_QUEUED = "BytesQueued";
+ String BYTES_READ = "BytesReadLast5Minutes";
+ String BYTES_WRITTEN = "BytesWrittenLast5Minutes";
+ String ACTIVE_THREADS = "ActiveThreads";
+ String TOTAL_TASK_DURATION_SECONDS = "TotalTaskDurationSeconds";
+ String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds";
+
+ // JVM Metrics
+ String JVM_UPTIME = "jvm.uptime";
+ String JVM_HEAP_USED = "jvm.heap_used";
+ String JVM_HEAP_USAGE = "jvm.heap_usage";
+ String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage";
+ String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable";
+ String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked";
+ String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting";
+ String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated";
+ String JVM_THREAD_COUNT = "jvm.thread_count";
+ String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count";
+ String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage";
+ String JVM_GC_RUNS = "jvm.gc.runs";
+ String JVM_GC_TIME = "jvm.gc.time";
+
+ // OS Metrics
+ String LOAD1MN = "loadAverage1min";
+ String CORES = "availableCores";
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java
new file mode 100644
index 0000000..ed3922a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.reporting.util.metrics.api.MetricFields;
+
+import com.yammer.metrics.core.VirtualMachineMetrics;
+
+/**
+ * A service used to produce key/value metrics based on a given input.
+ */
+public class MetricsService {
+
+ /**
+ * Generates a Map of metrics for a ProcessGroupStatus instance.
+ *
+ * @param status a ProcessGroupStatus to get metrics from
+ * @param appendPgId if true, the process group ID will be appended at the end of the metric name
+ * @return a map of metrics for the given status
+ */
+ public Map<String,String> getMetrics(ProcessGroupStatus status, boolean appendPgId) {
+ final Map<String,String> metrics = new HashMap<>();
+
+ Map<String,Long> longMetrics = getLongMetrics(status, appendPgId);
+ for (String key : longMetrics.keySet()) {
+ metrics.put(key, String.valueOf(longMetrics.get(key)));
+ }
+
+ Map<String,Integer> integerMetrics = getIntegerMetrics(status, appendPgId);
+ for (String key : integerMetrics.keySet()) {
+ metrics.put(key, String.valueOf(integerMetrics.get(key)));
+ }
+
+ return metrics;
+ }
+
+ private Map<String,Integer> getIntegerMetrics(ProcessGroupStatus status, boolean appendPgId) {
+ final Map<String,Integer> metrics = new HashMap<>();
+ metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), status.getFlowFilesReceived());
+ metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), status.getFlowFilesSent());
+ metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), status.getQueuedCount());
+ metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), status.getActiveThreadCount());
+ return metrics;
+ }
+
+ private Map<String,Long> getLongMetrics(ProcessGroupStatus status, boolean appendPgId) {
+ final Map<String,Long> metrics = new HashMap<>();
+ metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), status.getBytesReceived());
+ metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), status.getBytesSent());
+ metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), status.getQueuedContentSize());
+ metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), status.getBytesRead());
+ metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), status.getBytesWritten());
+
+ final long durationNanos = calculateProcessingNanos(status);
+ metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), durationNanos);
+
+ final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
+ metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), durationSeconds);
+
+ return metrics;
+ }
+
+ /**
+ * Generates a Map of metrics for VirtualMachineMetrics.
+ *
+ * @param virtualMachineMetrics a VirtualMachineMetrics instance to get metrics from
+ * @return a map of metrics from the given VirtualMachineStatus
+ */
+ public Map<String,String> getMetrics(VirtualMachineMetrics virtualMachineMetrics) {
+ final Map<String,String> metrics = new HashMap<>();
+
+ Map<String,Integer> integerMetrics = getIntegerMetrics(virtualMachineMetrics);
+ for (String key : integerMetrics.keySet()) {
+ metrics.put(key, String.valueOf(integerMetrics.get(key)));
+ }
+
+ Map<String,Long> longMetrics = getLongMetrics(virtualMachineMetrics);
+ for (String key : longMetrics.keySet()) {
+ metrics.put(key, String.valueOf(longMetrics.get(key)));
+ }
+
+ Map<String,Double> doubleMetrics = getDoubleMetrics(virtualMachineMetrics);
+ for (String key : doubleMetrics.keySet()) {
+ metrics.put(key, String.valueOf(doubleMetrics.get(key)));
+ }
+
+ return metrics;
+ }
+
+ // calculates the total processing time of all processors in nanos
+ protected long calculateProcessingNanos(final ProcessGroupStatus status) {
+ long nanos = 0L;
+
+ for (final ProcessorStatus procStats : status.getProcessorStatus()) {
+ nanos += procStats.getProcessingNanos();
+ }
+
+ for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
+ nanos += calculateProcessingNanos(childGroupStatus);
+ }
+
+ return nanos;
+ }
+
+ // append the process group ID if necessary
+ private String appendPgId(String name, ProcessGroupStatus status, boolean appendPgId) {
+ if(appendPgId) {
+ return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId();
+ } else {
+ return name;
+ }
+ }
+
+ private Map<String,Double> getDoubleMetrics(VirtualMachineMetrics virtualMachineMetrics) {
+ final Map<String,Double> metrics = new HashMap<>();
+ metrics.put(MetricNames.JVM_HEAP_USED, virtualMachineMetrics.heapUsed());
+ metrics.put(MetricNames.JVM_HEAP_USAGE, virtualMachineMetrics.heapUsage());
+ metrics.put(MetricNames.JVM_NON_HEAP_USAGE, virtualMachineMetrics.nonHeapUsage());
+ metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, virtualMachineMetrics.fileDescriptorUsage());
+ return metrics;
+ }
+
+ private Map<String,Long> getLongMetrics(VirtualMachineMetrics virtualMachineMetrics) {
+ final Map<String,Long> metrics = new HashMap<>();
+ metrics.put(MetricNames.JVM_UPTIME, virtualMachineMetrics.uptime());
+
+ for (Map.Entry<String,VirtualMachineMetrics.GarbageCollectorStats> entry : virtualMachineMetrics.garbageCollectors().entrySet()) {
+ final String gcName = entry.getKey().replace(" ", "");
+ final long runs = entry.getValue().getRuns();
+ final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS);
+ metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, runs);
+ metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, timeMS);
+ }
+
+ return metrics;
+ }
+
+ private Map<String,Integer> getIntegerMetrics(VirtualMachineMetrics virtualMachineMetrics) {
+ final Map<String,Integer> metrics = new HashMap<>();
+ metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, virtualMachineMetrics.daemonThreadCount());
+ metrics.put(MetricNames.JVM_THREAD_COUNT, virtualMachineMetrics.threadCount());
+
+ for (Map.Entry<Thread.State,Double> entry : virtualMachineMetrics.threadStatePercentages().entrySet()) {
+ final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue()));
+ switch(entry.getKey()) {
+ case BLOCKED:
+ metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, normalizedValue);
+ break;
+ case RUNNABLE:
+ metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, normalizedValue);
+ break;
+ case TERMINATED:
+ metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, normalizedValue);
+ break;
+ case TIMED_WAITING:
+ metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, normalizedValue);
+ break;
+ default:
+ break;
+ }
+ }
+
+ return metrics;
+ }
+
+ public JsonObject getMetrics(JsonBuilderFactory factory, ProcessGroupStatus status, VirtualMachineMetrics virtualMachineMetrics,
+ String applicationId, String id, String hostname, long currentTimeMillis, int availableProcessors, double systemLoad) {
+ JsonObjectBuilder objectBuilder = factory.createObjectBuilder()
+ .add(MetricFields.APP_ID, applicationId)
+ .add(MetricFields.HOSTNAME, hostname)
+ .add(MetricFields.INSTANCE_ID, status.getId())
+ .add(MetricFields.TIMESTAMP, currentTimeMillis);
+
+ objectBuilder
+ .add(MetricNames.CORES, availableProcessors)
+ .add(MetricNames.LOAD1MN, systemLoad);
+
+ Map<String,Integer> integerMetrics = getIntegerMetrics(virtualMachineMetrics);
+ for (String key : integerMetrics.keySet()) {
+ objectBuilder.add(key.replaceAll("\\.", ""), integerMetrics.get(key));
+ }
+
+ Map<String,Long> longMetrics = getLongMetrics(virtualMachineMetrics);
+ for (String key : longMetrics.keySet()) {
+ objectBuilder.add(key.replaceAll("\\.", ""), longMetrics.get(key));
+ }
+
+ Map<String,Double> doubleMetrics = getDoubleMetrics(virtualMachineMetrics);
+ for (String key : doubleMetrics.keySet()) {
+ objectBuilder.add(key.replaceAll("\\.", ""), doubleMetrics.get(key));
+ }
+
+ Map<String,Long> longPgMetrics = getLongMetrics(status, false);
+ for (String key : longPgMetrics.keySet()) {
+ objectBuilder.add(key, longPgMetrics.get(key));
+ }
+
+ Map<String,Integer> integerPgMetrics = getIntegerMetrics(status, false);
+ for (String key : integerPgMetrics.keySet()) {
+ objectBuilder.add(key, integerPgMetrics.get(key));
+ }
+
+ return objectBuilder.build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java
new file mode 100644
index 0000000..81fb021
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics.api;
+
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+
+/**
+ * Builds the JsonObject for an individual metric.
+ */
+public class MetricBuilder {
+
+ private final JsonBuilderFactory factory;
+
+ private String applicationId;
+ private String instanceId;
+ private String hostname;
+ private String timestamp;
+ private String metricName;
+ private String metricValue;
+
+ public MetricBuilder(final JsonBuilderFactory factory) {
+ this.factory = factory;
+ }
+
+ public MetricBuilder applicationId(final String applicationId) {
+ this.applicationId = applicationId;
+ return this;
+ }
+
+ public MetricBuilder instanceId(final String instanceId) {
+ this.instanceId = instanceId;
+ return this;
+ }
+
+ public MetricBuilder hostname(final String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ public MetricBuilder timestamp(final long timestamp) {
+ this.timestamp = String.valueOf(timestamp);
+ return this;
+ }
+
+ public MetricBuilder metricName(final String metricName) {
+ this.metricName = metricName;
+ return this;
+ }
+
+ public MetricBuilder metricValue(final String metricValue) {
+ this.metricValue = metricValue;
+ return this;
+ }
+
+ public JsonObject build() {
+ return factory.createObjectBuilder()
+ .add(MetricFields.METRIC_NAME, metricName)
+ .add(MetricFields.APP_ID, applicationId)
+ .add(MetricFields.INSTANCE_ID, instanceId)
+ .add(MetricFields.HOSTNAME, hostname)
+ .add(MetricFields.TIMESTAMP, timestamp)
+ .add(MetricFields.START_TIME, timestamp)
+ .add(MetricFields.METRICS,
+ factory.createObjectBuilder()
+ .add(String.valueOf(timestamp), metricValue)
+ ).build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java
new file mode 100644
index 0000000..4c451ea
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics.api;
+
+public interface MetricFields {
+
+ String METRIC_NAME = "metricname";
+ String APP_ID = "appid";
+ String INSTANCE_ID = "instanceid";
+ String HOSTNAME = "hostname";
+ String TIMESTAMP = "timestamp";
+ String START_TIME = "starttime";
+ String METRICS = "metrics";
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java
new file mode 100644
index 0000000..3694720
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics.api;
+
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Builds the overall JsonObject for the Metrics.
+ */
+public class MetricsBuilder {
+
+ static final String ROOT_JSON_ELEMENT = "metrics";
+
+ private final JsonBuilderFactory factory;
+
+ private long timestamp;
+ private String applicationId;
+ private String instanceId;
+ private String hostname;
+ private Map<String,String> metrics = new HashMap<>();
+
+ public MetricsBuilder(final JsonBuilderFactory factory) {
+ this.factory = factory;
+ }
+
+ public MetricsBuilder applicationId(final String applicationId) {
+ this.applicationId = applicationId;
+ return this;
+ }
+
+ public MetricsBuilder instanceId(final String instanceId) {
+ this.instanceId = instanceId;
+ return this;
+ }
+
+ public MetricsBuilder hostname(final String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ public MetricsBuilder timestamp(final long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ public MetricsBuilder metric(final String name, String value) {
+ this.metrics.put(name, value);
+ return this;
+ }
+
+ public MetricsBuilder addAllMetrics(final Map<String,String> metrics) {
+ this.metrics.putAll(metrics);
+ return this;
+ }
+
+ public JsonObject build() {
+ // builds JsonObject for individual metrics
+ final MetricBuilder metricBuilder = new MetricBuilder(factory);
+ metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname);
+
+ final JsonArrayBuilder metricArrayBuilder = factory.createArrayBuilder();
+
+ for (Map.Entry<String,String> entry : metrics.entrySet()) {
+ metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue());
+ metricArrayBuilder.add(metricBuilder.build());
+ }
+
+ // add the array of metrics to a top-level json object
+ final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder();
+ metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder);
+ return metricsBuilder.build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
index c320ae2..93a3196 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
@@ -55,6 +55,23 @@
<version>1.7.0-SNAPSHOT</version>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-avro-record-utils</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.json</artifactId>
<version>1.0.4</version>
@@ -83,10 +100,30 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes combine.children="append">
+ <exclude>src/main/resources/schema-metrics.avsc</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
index 341a6d8..e755354 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
@@ -16,6 +16,23 @@
*/
package org.apache.nifi.reporting;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import javax.json.JsonObjectBuilder;
+import javax.json.JsonValue;
+import javax.net.ssl.SSLContext;
+
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
@@ -25,27 +42,51 @@ import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SerializedForm;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringUtils;
-
-import javax.net.ssl.SSLContext;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
/**
* Base class for ReportingTasks that send data over site-to-site.
*/
public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingTask {
+
+ protected static final String LAST_EVENT_ID_KEY = "last_event_id";
protected static final String DESTINATION_URL_PATH = "/nifi";
+ protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder()
.name("Destination URL")
@@ -141,8 +182,16 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
.sensitive(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
+ static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Controller Service to use for writing out the records.")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(false)
+ .build();
protected volatile SiteToSiteClient siteToSiteClient;
+ protected volatile RecordSchema recordSchema;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -188,7 +237,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(context.getProperty(HTTP_PROXY_HOSTNAME).getValue()) ? null
: new HttpProxy(context.getProperty(HTTP_PROXY_HOSTNAME).getValue(), context.getProperty(HTTP_PROXY_PORT).asInteger(),
- context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue());
+ context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue());
siteToSiteClient = new SiteToSiteClient.Builder()
.urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl))
@@ -215,6 +264,33 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
return this.siteToSiteClient;
}
+ protected byte[] getData(final ReportingContext context, InputStream in, Map<String, String> attributes) {
+ try (final JsonRecordReader reader = new JsonRecordReader(in, recordSchema)) {
+
+ final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ final RecordSchema writeSchema = writerFactory.getSchema(null, recordSchema);
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
+ writer.beginRecordSet();
+
+ Record record;
+ while ((record = reader.nextRecord()) != null) {
+ writer.write(record);
+ }
+
+ final WriteResult writeResult = writer.finishRecordSet();
+
+ attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+ }
+
+ return out.toByteArray();
+ } catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
+ throw new ProcessException("Failed to write metrics using record writer: " + e.getMessage(), e);
+ }
+ }
+
static class NiFiUrlValidator implements Validator {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
@@ -236,4 +312,334 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
}
}
}
+
+ protected void addField(final JsonObjectBuilder builder, final String key, final Long value) {
+ if (value != null) {
+ builder.add(key, value.longValue());
+ }
+ }
+
+ protected void addField(final JsonObjectBuilder builder, final String key, final Integer value) {
+ if (value != null) {
+ builder.add(key, value.intValue());
+ }
+ }
+
+ protected void addField(final JsonObjectBuilder builder, final String key, final String value) {
+ if (value == null) {
+ return;
+ }
+
+ builder.add(key, value);
+ }
+
+ protected void addField(final JsonObjectBuilder builder, final String key, final String value, final boolean allowNullValues) {
+ if (value == null) {
+ if (allowNullValues) {
+ builder.add(key, JsonValue.NULL);
+ }
+ } else {
+ builder.add(key, value);
+ }
+ }
+
+ private class JsonRecordReader implements RecordReader {
+
+ private RecordSchema recordSchema;
+ private final JsonParser jsonParser;
+ private final boolean array;
+ private final JsonNode firstJsonNode;
+ private boolean firstObjectConsumed = false;
+
+ private final Supplier<DateFormat> dateFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat());
+ private final Supplier<DateFormat> timeFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.TIME.getDefaultFormat());
+ private final Supplier<DateFormat> timestampFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat());
+
+ public JsonRecordReader(final InputStream in, RecordSchema recordSchema) throws IOException, MalformedRecordException {
+ this.recordSchema = recordSchema;
+ try {
+ jsonParser = new JsonFactory().createJsonParser(in);
+ jsonParser.setCodec(new ObjectMapper());
+ JsonToken token = jsonParser.nextToken();
+ if (token == JsonToken.START_ARRAY) {
+ array = true;
+ token = jsonParser.nextToken();
+ } else {
+ array = false;
+ }
+ if (token == JsonToken.START_OBJECT) {
+ firstJsonNode = jsonParser.readValueAsTree();
+ } else {
+ firstJsonNode = null;
+ }
+ } catch (final JsonParseException e) {
+ throw new MalformedRecordException("Could not parse data as JSON", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ jsonParser.close();
+ }
+
+ @Override
+ public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException {
+ if (firstObjectConsumed && !array) {
+ return null;
+ }
+ try {
+ return convertJsonNodeToRecord(getNextJsonNode(), getSchema(), null, coerceTypes, dropUnknownFields);
+ } catch (final MalformedRecordException mre) {
+ throw mre;
+ } catch (final IOException ioe) {
+ throw ioe;
+ } catch (final Exception e) {
+ throw new MalformedRecordException("Failed to convert data into a Record object with the given schema", e);
+ }
+ }
+
+ @Override
+ public RecordSchema getSchema() throws MalformedRecordException {
+ return recordSchema;
+ }
+
+ private JsonNode getNextJsonNode() throws JsonParseException, IOException, MalformedRecordException {
+ if (!firstObjectConsumed) {
+ firstObjectConsumed = true;
+ return firstJsonNode;
+ }
+ while (true) {
+ final JsonToken token = jsonParser.nextToken();
+ if (token == null) {
+ return null;
+ }
+ switch (token) {
+ case END_OBJECT:
+ continue;
+ case START_OBJECT:
+ return jsonParser.readValueAsTree();
+ case END_ARRAY:
+ case START_ARRAY:
+ return null;
+ default:
+ throw new MalformedRecordException("Expected to get a JSON Object but got a token of type " + token.name());
+ }
+ }
+ }
+
+ private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix,
+ final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException {
+
+ final Map<String, Object> values = new HashMap<>(schema.getFieldCount() * 2);
+
+ if (dropUnknown) {
+ for (final RecordField recordField : schema.getFields()) {
+ final JsonNode childNode = getChildNode(jsonNode, recordField);
+ if (childNode == null) {
+ continue;
+ }
+
+ final String fieldName = recordField.getFieldName();
+ final Object value;
+
+ if (coerceTypes) {
+ final DataType desiredType = recordField.getDataType();
+ final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
+ value = convertField(childNode, fullFieldName, desiredType, dropUnknown);
+ } else {
+ value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType());
+ }
+
+ values.put(fieldName, value);
+ }
+ } else {
+ final Iterator<String> fieldNames = jsonNode.getFieldNames();
+ while (fieldNames.hasNext()) {
+ final String fieldName = fieldNames.next();
+ final JsonNode childNode = jsonNode.get(fieldName);
+ final RecordField recordField = schema.getField(fieldName).orElse(null);
+ final Object value;
+
+ if (coerceTypes && recordField != null) {
+ final DataType desiredType = recordField.getDataType();
+ final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
+ value = convertField(childNode, fullFieldName, desiredType, dropUnknown);
+ } else {
+ value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType());
+ }
+
+ values.put(fieldName, value);
+ }
+ }
+
+ final Supplier<String> supplier = () -> jsonNode.toString();
+ return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json"), false, dropUnknown);
+ }
+
+ private JsonNode getChildNode(final JsonNode jsonNode, final RecordField field) {
+ if (jsonNode.has(field.getFieldName())) {
+ return jsonNode.get(field.getFieldName());
+ }
+ for (final String alias : field.getAliases()) {
+ if (jsonNode.has(alias)) {
+ return jsonNode.get(alias);
+ }
+ }
+ return null;
+ }
+
+ protected Object convertField(final JsonNode fieldNode, final String fieldName, final DataType desiredType, final boolean dropUnknown) throws IOException, MalformedRecordException {
+ if (fieldNode == null || fieldNode.isNull()) {
+ return null;
+ }
+
+ switch (desiredType.getFieldType()) {
+ case BOOLEAN:
+ case BYTE:
+ case CHAR:
+ case DOUBLE:
+ case FLOAT:
+ case INT:
+ case BIGINT:
+ case LONG:
+ case SHORT:
+ case STRING:
+ case DATE:
+ case TIME:
+ case TIMESTAMP: {
+ final Object rawValue = getRawNodeValue(fieldNode, null);
+ final Object converted = DataTypeUtils.convertType(rawValue, desiredType, dateFormat, timeFormat, timestampFormat, fieldName);
+ return converted;
+ }
+ case MAP: {
+ final DataType valueType = ((MapDataType) desiredType).getValueType();
+
+ final Map<String, Object> map = new HashMap<>();
+ final Iterator<String> fieldNameItr = fieldNode.getFieldNames();
+ while (fieldNameItr.hasNext()) {
+ final String childName = fieldNameItr.next();
+ final JsonNode childNode = fieldNode.get(childName);
+ final Object childValue = convertField(childNode, fieldName, valueType, dropUnknown);
+ map.put(childName, childValue);
+ }
+
+ return map;
+ }
+ case ARRAY: {
+ final ArrayNode arrayNode = (ArrayNode) fieldNode;
+ final int numElements = arrayNode.size();
+ final Object[] arrayElements = new Object[numElements];
+ int count = 0;
+ for (final JsonNode node : arrayNode) {
+ final DataType elementType = ((ArrayDataType) desiredType).getElementType();
+ final Object converted = convertField(node, fieldName, elementType, dropUnknown);
+ arrayElements[count++] = converted;
+ }
+
+ return arrayElements;
+ }
+ case RECORD: {
+ if (fieldNode.isObject()) {
+ RecordSchema childSchema;
+ if (desiredType instanceof RecordDataType) {
+ childSchema = ((RecordDataType) desiredType).getChildSchema();
+ } else {
+ return null;
+ }
+
+ if (childSchema == null) {
+ final List<RecordField> fields = new ArrayList<>();
+ final Iterator<String> fieldNameItr = fieldNode.getFieldNames();
+ while (fieldNameItr.hasNext()) {
+ fields.add(new RecordField(fieldNameItr.next(), RecordFieldType.STRING.getDataType()));
+ }
+
+ childSchema = new SimpleRecordSchema(fields);
+ }
+
+ return convertJsonNodeToRecord(fieldNode, childSchema, fieldName + ".", true, dropUnknown);
+ } else {
+ return null;
+ }
+ }
+ case CHOICE: {
+ return DataTypeUtils.convertType(getRawNodeValue(fieldNode, null), desiredType, fieldName);
+ }
+ }
+
+ return null;
+ }
+
+ protected Object getRawNodeValue(final JsonNode fieldNode, final DataType dataType) throws IOException {
+ if (fieldNode == null || fieldNode.isNull()) {
+ return null;
+ }
+
+ if (fieldNode.isNumber()) {
+ return fieldNode.getNumberValue();
+ }
+
+ if (fieldNode.isBinary()) {
+ return fieldNode.getBinaryValue();
+ }
+
+ if (fieldNode.isBoolean()) {
+ return fieldNode.getBooleanValue();
+ }
+
+ if (fieldNode.isTextual()) {
+ return fieldNode.getTextValue();
+ }
+
+ if (fieldNode.isArray()) {
+ final ArrayNode arrayNode = (ArrayNode) fieldNode;
+ final int numElements = arrayNode.size();
+ final Object[] arrayElements = new Object[numElements];
+ int count = 0;
+
+ final DataType elementDataType;
+ if (dataType != null && dataType.getFieldType() == RecordFieldType.ARRAY) {
+ final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+ elementDataType = arrayDataType.getElementType();
+ } else {
+ elementDataType = null;
+ }
+
+ for (final JsonNode node : arrayNode) {
+ final Object value = getRawNodeValue(node, elementDataType);
+ arrayElements[count++] = value;
+ }
+
+ return arrayElements;
+ }
+
+ if (fieldNode.isObject()) {
+ RecordSchema childSchema;
+ if (dataType != null && RecordFieldType.RECORD == dataType.getFieldType()) {
+ final RecordDataType recordDataType = (RecordDataType) dataType;
+ childSchema = recordDataType.getChildSchema();
+ } else {
+ childSchema = null;
+ }
+
+ if (childSchema == null) {
+ childSchema = new SimpleRecordSchema(Collections.emptyList());
+ }
+
+ final Iterator<String> fieldNames = fieldNode.getFieldNames();
+ final Map<String, Object> childValues = new HashMap<>();
+ while (fieldNames.hasNext()) {
+ final String childFieldName = fieldNames.next();
+ final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), dataType);
+ childValues.put(childFieldName, childValue);
+ }
+
+ final MapRecord record = new MapRecord(childSchema, childValues);
+ return record;
+ }
+
+ return null;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
index fac7696..20ed96a 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
@@ -68,9 +68,6 @@ import java.util.concurrent.TimeUnit;
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {
- static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
- static final String LAST_EVENT_ID_KEY = "last_event_id";
-
static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
.name("Platform")
.description("The value to use for the platform field in each provenance event.")
@@ -195,7 +192,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
lastSentBulletinId = currMaxId;
}
- static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df,
+ private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df,
final String platform, final String nodeIdentifier) {
addField(builder, "objectId", UUID.randomUUID().toString());
@@ -216,17 +213,4 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
return builder.build();
}
- private static void addField(final JsonObjectBuilder builder, final String key, final Long value) {
- if (value != null) {
- builder.add(key, value.longValue());
- }
- }
-
- private static void addField(final JsonObjectBuilder builder, final String key, final String value) {
- if (value == null) {
- return;
- }
- builder.add(key, value);
- }
-
}