You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by pvillard31 <gi...@git.apache.org> on 2018/01/23 22:19:03 UTC

[GitHub] nifi pull request #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingT...

GitHub user pvillard31 opened a pull request:

    https://github.com/apache/nifi/pull/2430

    NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

    To avoid some code duplication, I moved few utilitary classes into the reporting-utils package. And I also added two metrics (available cores and load average) as we have in System Diagnostic but that we don't send with the current Ambari reporting task.
    
    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pvillard31/nifi NIFI-4809

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2430.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2430
    
----
commit c49dd0e9ef08b34f30939b2771cd43bf580dc5d4
Author: Pierre Villard <pi...@...>
Date:   2018-01-23T22:15:18Z

    NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

----


---

[GitHub] nifi pull request #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingT...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/2430


---

[GitHub] nifi issue #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2430
  
    Mind rebasing this? Please and thanks!


---

[GitHub] nifi issue #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2430
  
    @mattyb149 - pushed another commit with unit tests and doc


---

[GitHub] nifi pull request #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingT...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2430#discussion_r174293625
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.lang.management.ManagementFactory;
    +import java.lang.management.OperatingSystemMXBean;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +
    +import org.apache.avro.Schema;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.avro.AvroTypeUtil;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.status.ProcessGroupStatus;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.reporting.util.metrics.MetricNames;
    +import org.apache.nifi.reporting.util.metrics.MetricsService;
    +import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
    +
    +import com.yammer.metrics.core.VirtualMachineMetrics;
    +
    +@Tags({"status", "metrics", "site", "site to site"})
    +@CapabilityDescription("Publishes same metrics as the Ambari Reporting task using the Site To Site protocol.")
    +public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingTask {
    +
    +    static final AllowableValue AMBARI_FORMAT = new AllowableValue("ambari-format", "Ambari Format", "Metrics will be formatted"
    +            + " according to the Ambari Metrics API. See Additional Details in Usage documentation.");
    +    static final AllowableValue RECORD_FORMAT = new AllowableValue("record-format", "Record Format", "Metrics will be formatted"
    +            + " using the Record Writer property of this reporting task. See Additional Details in Usage documentation to"
    +            + " have the description of the default schema.");
    +
    +    static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
    +            .name("s2s-metrics-application-id")
    +            .displayName("Application ID")
    +            .description("The Application ID to be included in the metrics")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("nifi")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
    +            .name("s2s-metrics-hostname")
    +            .displayName("Hostname")
    +            .description("The Hostname of this NiFi instance to be included in the metrics")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("${hostname(true)}")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
    +            .name("s2s-metrics-format")
    +            .displayName("Output format")
    +            .description("The output format that will be used for the metrics")
    --- End diff --
    
    Perhaps add to the description that if Record Format is selected, a Record Writer must be provided. Also for consistency, the display name should have both words capitalized.


---

[GitHub] nifi pull request #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingT...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2430#discussion_r174294363
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html ---
    @@ -0,0 +1,178 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +    <!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +    <head>
    +        <meta charset="utf-8" />
    +        <title>SiteToSiteMetricsReportingTask</title>
    +
    +        <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
    +    </head>
    +
    +    <body>
    --- End diff --
    
    This is excellent documentation, thank you!


---

[GitHub] nifi issue #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2430
  
    I'll need to update this PR if #2431 is merged first.


---

[GitHub] nifi issue #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2430
  
    What about adding a property to let the user decide the output format:
    - "Output format" with two options: "Ambari Metrics Collector format" or "Record based format"
    - "Record writer": this property being used if and only if the "Record based format" is selected
    
    My primary objective was to have this reporting task usable in MiNiFi java agents to send the metrics to a NiFi cluster that would publish the metrics to AMS. But we could definitely use this reporting task to get the metrics and store the data into another store such as Elasticsearch.
    
    In any case, I'd add an "Additional details" page to provide information about the schemas.


---

[GitHub] nifi issue #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2430
  
    I like all those ideas!


---

[GitHub] nifi pull request #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingT...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2430#discussion_r170728915
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java ---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import java.lang.management.ManagementFactory;
    +import java.lang.management.OperatingSystemMXBean;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.status.ProcessGroupStatus;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.reporting.util.metrics.MetricsService;
    +import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
    +
    +import com.yammer.metrics.core.VirtualMachineMetrics;
    +
    +@Tags({"status", "metrics", "site", "site to site"})
    +@CapabilityDescription("Publishes same metrics as the Ambari Reporting task using the Site To Site protocol. "
    +        + "Metrics are formatted according to the Ambari Metrics API.")
    +public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingTask {
    +
    +    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    +
    +    static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
    +            .name("Application ID")
    --- End diff --
    
    Please set this(and the other properties) to a machine-friendly name, you can use the current value for .displayName()


---

[GitHub] nifi pull request #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingT...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2430#discussion_r174316057
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java ---
    @@ -140,8 +164,16 @@
                 .sensitive(true)
                 .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
                 .build();
    +    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
    +            .name("record-writer")
    +            .displayName("Record Writer")
    +            .description("Specifies the Controller Service to use for writing out the records.")
    --- End diff --
    
    Left it as-is since I added the property Record Writer only in the task reporting task for now.


---

[GitHub] nifi issue #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2430
  
    Done @mattyb149, thanks!


---

[GitHub] nifi pull request #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingT...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2430#discussion_r174315953
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.lang.management.ManagementFactory;
    +import java.lang.management.OperatingSystemMXBean;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +
    +import org.apache.avro.Schema;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.avro.AvroTypeUtil;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.status.ProcessGroupStatus;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.reporting.util.metrics.MetricNames;
    +import org.apache.nifi.reporting.util.metrics.MetricsService;
    +import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
    +
    +import com.yammer.metrics.core.VirtualMachineMetrics;
    +
    +@Tags({"status", "metrics", "site", "site to site"})
    +@CapabilityDescription("Publishes same metrics as the Ambari Reporting task using the Site To Site protocol.")
    +public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingTask {
    +
    +    static final AllowableValue AMBARI_FORMAT = new AllowableValue("ambari-format", "Ambari Format", "Metrics will be formatted"
    +            + " according to the Ambari Metrics API. See Additional Details in Usage documentation.");
    +    static final AllowableValue RECORD_FORMAT = new AllowableValue("record-format", "Record Format", "Metrics will be formatted"
    +            + " using the Record Writer property of this reporting task. See Additional Details in Usage documentation to"
    +            + " have the description of the default schema.");
    +
    +    static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
    +            .name("s2s-metrics-application-id")
    +            .displayName("Application ID")
    +            .description("The Application ID to be included in the metrics")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("nifi")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
    +            .name("s2s-metrics-hostname")
    +            .displayName("Hostname")
    +            .description("The Hostname of this NiFi instance to be included in the metrics")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("${hostname(true)}")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
    +            .name("s2s-metrics-format")
    +            .displayName("Output format")
    +            .description("The output format that will be used for the metrics")
    +            .required(true)
    +            .allowableValues(AMBARI_FORMAT, RECORD_FORMAT)
    +            .defaultValue(AMBARI_FORMAT.getValue())
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    private final MetricsService metricsService = new MetricsService();
    +
    +    public SiteToSiteMetricsReportingTask() throws IOException {
    +        final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-metrics.avsc");
    +        recordSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(schema));
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
    +        properties.add(HOSTNAME);
    +        properties.add(APPLICATION_ID);
    +        properties.add(FORMAT);
    +        properties.add(RECORD_WRITER);
    +        properties.remove(BATCH_SIZE);
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
    +
    +        final boolean isWriterSet = validationContext.getProperty(RECORD_WRITER).isSet();
    +        if (validationContext.getProperty(FORMAT).getValue().equals(RECORD_FORMAT.getValue()) && !isWriterSet) {
    --- End diff --
    
    I added the check and added unit tests for the customValidate


---

[GitHub] nifi issue #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2430
  
    Reusing the Ambari format certainly makes this one easier to implement, and as you said the user can convert later with a record processor, but I'm thinking it might be best to be able to specify the RecordWriter in the reporting task. That way the conversion, filtering, etc. can be done while the records are already in object format (i.e. parsed), which would save the step of re-parsing and re-writing. Using a JsonRecordSetWriter with "Inherit Record Schema" would result in the same behavior as you propose here, or you could provide a different format and/or filter out the fields, etc. by using a configured RecordSetWriter.
    
    @markap14 suggested to encode the "input" schema as a file in src/main/resources and read it in when the class/instance is created, using the available util method to convert it to a RecordSchema. To be honest I would like to have the S2S Provenance reporting task do this as well, but that's a bit more invasive since it already exists. Since the one in this PR is new, would be nice to have it be the exemplar for creating reporting services in the future. What do you think?


---

[GitHub] nifi pull request #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingT...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2430#discussion_r170730176
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java ---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import java.lang.management.ManagementFactory;
    +import java.lang.management.OperatingSystemMXBean;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.status.ProcessGroupStatus;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.reporting.util.metrics.MetricsService;
    +import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
    +
    +import com.yammer.metrics.core.VirtualMachineMetrics;
    +
    +@Tags({"status", "metrics", "site", "site to site"})
    +@CapabilityDescription("Publishes same metrics as the Ambari Reporting task using the Site To Site protocol. "
    +        + "Metrics are formatted according to the Ambari Metrics API.")
    +public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingTask {
    +
    +    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    +
    +    static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
    +            .name("Application ID")
    +            .description("The Application ID to be included in the metrics sent to Ambari")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("nifi")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
    +            .name("Hostname")
    +            .description("The Hostname of this NiFi instance to be included in the metrics sent to Ambari")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("${hostname(true)}")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    private final MetricsService metricsService = new MetricsService();
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
    +        properties.add(HOSTNAME);
    +        properties.add(APPLICATION_ID);
    +        properties.remove(BATCH_SIZE);
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ReportingContext context) {
    +        final boolean isClustered = context.isClustered();
    +        final String nodeId = context.getClusterNodeIdentifier();
    +        if (nodeId == null && isClustered) {
    +            getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
    +                    + "Will wait for Node Identifier to be established.");
    +            return;
    +        }
    +
    +        final VirtualMachineMetrics virtualMachineMetrics = VirtualMachineMetrics.getInstance();
    +        final Map<String, ?> config = Collections.emptyMap();
    +        final JsonBuilderFactory factory = Json.createBuilderFactory(config);
    +
    +        final String applicationId = context.getProperty(APPLICATION_ID).evaluateAttributeExpressions().getValue();
    +        final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
    +        final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
    +
    +        if(status != null) {
    +            final Map<String,String> statusMetrics = metricsService.getMetrics(status, false);
    +            final Map<String,String> jvmMetrics = metricsService.getMetrics(virtualMachineMetrics);
    +
    +            final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
    +            final OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
    +            final double systemLoad = os.getSystemLoadAverage();
    +
    +            final JsonObject metricsObject = metricsBuilder
    +                    .applicationId(applicationId)
    +                    .instanceId(status.getId())
    +                    .hostname(hostname)
    +                    .timestamp(System.currentTimeMillis())
    +                    .addAllMetrics(statusMetrics)
    +                    .addAllMetrics(jvmMetrics)
    +                    .metric("available.cores", String.valueOf(os.getAvailableProcessors()))
    +                    .metric("load.average.1min", String.valueOf(systemLoad >= 0 ? systemLoad : -1))
    +                    .build();
    +
    +            try {
    +                long start = System.nanoTime();
    +                final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
    +                if (transaction == null) {
    +                    getLogger().debug("All destination nodes are penalized; will attempt to send data later");
    +                    return;
    +                }
    +
    +                final Map<String, String> attributes = new HashMap<>();
    +                final String transactionId = UUID.randomUUID().toString();
    +                attributes.put("reporting.task.transaction.id", transactionId);
    --- End diff --
    
    Now that #2431 is in, we should add those attributes here too :)


---

[GitHub] nifi issue #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2430
  
    That's a great idea! I'll have a look.


---

[GitHub] nifi pull request #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingT...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2430#discussion_r174293350
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java ---
    @@ -140,8 +164,16 @@
                 .sensitive(true)
                 .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
                 .build();
    +    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
    +            .name("record-writer")
    +            .displayName("Record Writer")
    +            .description("Specifies the Controller Service to use for writing out the records.")
    --- End diff --
    
    The description here should mention that it is only used when the reporting task is configured to use a Record Writer. For the one you added, it has two modes, but future reporting tasks need not offer an alternative.


---

[GitHub] nifi issue #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2430
  
    @mattyb149 - this is not the final version, I still have unit tests and "additional details" doc to add, but wanted to give you an update in case you want to have a look and if you already have feedbacks.
    
    I've reworked the new reporting task as I suggested with two additional properties. I also reworked the way metrics are constructed to have the ambari format and the "record" format with the minimal code duplication. I also chose this implementation (using the JsonTreeReader) so that we can easily reuse the code for the other S2S reporting task. If it sounds OK to you, adding an optional Record Writer property to the other S2S reporting tasks should be really easy (I guess best is to take care of it into a follow-up JIRA).


---

[GitHub] nifi issue #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2430
  
    Ugh that's true, I'm not a fan of dynamic keys. Since the schema would be generated by the reporting task, then we could create a schema for the example above, but then each flow file would have its own schema, and even worse, each flow file would have only one metric, so at that point it's not really conducive to record processing. As an alternative we could convert the output (before or after JSON conversion) to an altered spec with a consistent schema definition. 
    For the Ambari reporting task, since Ambari is expecting this format, then fine; if we keep the same spec in this new reporting task for consistency, then I'd hope to see a template on the Wiki using the new reporting task with a JoltTransformJSON processor to do the aforementioned transformation, along with including an AvroSchemaRegistry that contains the schema definition for the files coming out of the JoltTransformJSON processor. This would allow us to keep a consistent (standard) defined format (albeit non-schema-friendly), but offer a well-known solution to prepare the data for record-aware processors. Thoughts?


---

[GitHub] nifi issue #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2430
  
    Thanks for your comments @mattyb149 - I just pushed a commit that should address everything.
    
    Regarding the record approach you suggested. Even though I really like the idea, I'm not sure to see how to define a valid avro schema for the specification used by the Ambari collector API (https://cwiki.apache.org/confluence/display/AMBARI/Metrics+Collector+API+Specification):
    
    ````json
    {
      "metrics": [
        {
          "metricname": "AMBARI_METRICS.SmokeTest.FakeMetric",
          "appid": "amssmoketestfake",
          "hostname": "ambari20-5.c.pramod-thangali.internal",
          "timestamp": 1432075898000,
          "starttime": 1432075898000,
          "metrics": {
            "1432075898000": 0.963781711428,
            "1432075899000": 1432075898000
          }
        }
      ]
    }
    ````
    
    How would we manage the 'metrics' part where field names are timestamps?


---

[GitHub] nifi pull request #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingT...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2430#discussion_r170729017
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java ---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import java.lang.management.ManagementFactory;
    +import java.lang.management.OperatingSystemMXBean;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.status.ProcessGroupStatus;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.reporting.util.metrics.MetricsService;
    +import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
    +
    +import com.yammer.metrics.core.VirtualMachineMetrics;
    +
    +@Tags({"status", "metrics", "site", "site to site"})
    +@CapabilityDescription("Publishes same metrics as the Ambari Reporting task using the Site To Site protocol. "
    +        + "Metrics are formatted according to the Ambari Metrics API.")
    +public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingTask {
    +
    +    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    +
    +    static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
    +            .name("Application ID")
    +            .description("The Application ID to be included in the metrics sent to Ambari")
    --- End diff --
    
    The phrase "sent to Ambari" shows up a couple times, but should be replaced/removed


---

[GitHub] nifi issue #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2430
  
    +1 LGTM, ran the unit tests and tried on a live NiFi instance with both Ambari and Record modes. The only thing I noticed (that should be its own Jira) is that the RecordWriter used by the reporting task does not report any referencing components.
    Thanks for this great addition! Merging to master


---

[GitHub] nifi pull request #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingT...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2430#discussion_r174315907
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.lang.management.ManagementFactory;
    +import java.lang.management.OperatingSystemMXBean;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +
    +import org.apache.avro.Schema;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.avro.AvroTypeUtil;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.status.ProcessGroupStatus;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.reporting.util.metrics.MetricNames;
    +import org.apache.nifi.reporting.util.metrics.MetricsService;
    +import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
    +
    +import com.yammer.metrics.core.VirtualMachineMetrics;
    +
    +@Tags({"status", "metrics", "site", "site to site"})
    +@CapabilityDescription("Publishes same metrics as the Ambari Reporting task using the Site To Site protocol.")
    +public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingTask {
    +
    +    static final AllowableValue AMBARI_FORMAT = new AllowableValue("ambari-format", "Ambari Format", "Metrics will be formatted"
    +            + " according to the Ambari Metrics API. See Additional Details in Usage documentation.");
    +    static final AllowableValue RECORD_FORMAT = new AllowableValue("record-format", "Record Format", "Metrics will be formatted"
    +            + " using the Record Writer property of this reporting task. See Additional Details in Usage documentation to"
    +            + " have the description of the default schema.");
    +
    +    static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
    +            .name("s2s-metrics-application-id")
    +            .displayName("Application ID")
    +            .description("The Application ID to be included in the metrics")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("nifi")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
    +            .name("s2s-metrics-hostname")
    +            .displayName("Hostname")
    +            .description("The Hostname of this NiFi instance to be included in the metrics")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("${hostname(true)}")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
    +            .name("s2s-metrics-format")
    +            .displayName("Output format")
    +            .description("The output format that will be used for the metrics")
    --- End diff --
    
    done


---

[GitHub] nifi pull request #2430: NIFI-4809 - Implement a SiteToSiteMetricsReportingT...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2430#discussion_r174294045
  
    --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.lang.management.ManagementFactory;
    +import java.lang.management.OperatingSystemMXBean;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +
    +import org.apache.avro.Schema;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.avro.AvroTypeUtil;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.status.ProcessGroupStatus;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.reporting.util.metrics.MetricNames;
    +import org.apache.nifi.reporting.util.metrics.MetricsService;
    +import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
    +
    +import com.yammer.metrics.core.VirtualMachineMetrics;
    +
    +@Tags({"status", "metrics", "site", "site to site"})
    +@CapabilityDescription("Publishes same metrics as the Ambari Reporting task using the Site To Site protocol.")
    +public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingTask {
    +
    +    static final AllowableValue AMBARI_FORMAT = new AllowableValue("ambari-format", "Ambari Format", "Metrics will be formatted"
    +            + " according to the Ambari Metrics API. See Additional Details in Usage documentation.");
    +    static final AllowableValue RECORD_FORMAT = new AllowableValue("record-format", "Record Format", "Metrics will be formatted"
    +            + " using the Record Writer property of this reporting task. See Additional Details in Usage documentation to"
    +            + " have the description of the default schema.");
    +
    +    static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
    +            .name("s2s-metrics-application-id")
    +            .displayName("Application ID")
    +            .description("The Application ID to be included in the metrics")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("nifi")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
    +            .name("s2s-metrics-hostname")
    +            .displayName("Hostname")
    +            .description("The Hostname of this NiFi instance to be included in the metrics")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("${hostname(true)}")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
    +            .name("s2s-metrics-format")
    +            .displayName("Output format")
    +            .description("The output format that will be used for the metrics")
    +            .required(true)
    +            .allowableValues(AMBARI_FORMAT, RECORD_FORMAT)
    +            .defaultValue(AMBARI_FORMAT.getValue())
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    private final MetricsService metricsService = new MetricsService();
    +
    +    public SiteToSiteMetricsReportingTask() throws IOException {
    +        final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-metrics.avsc");
    +        recordSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(schema));
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
    +        properties.add(HOSTNAME);
    +        properties.add(APPLICATION_ID);
    +        properties.add(FORMAT);
    +        properties.add(RECORD_WRITER);
    +        properties.remove(BATCH_SIZE);
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
    +
    +        final boolean isWriterSet = validationContext.getProperty(RECORD_WRITER).isSet();
    +        if (validationContext.getProperty(FORMAT).getValue().equals(RECORD_FORMAT.getValue()) && !isWriterSet) {
    --- End diff --
    
    Can we also check that a Record Writer is NOT set when Ambari Format is selected? This isn't a requirement, just a suggestion. Since it requires two operations to switch back and forth between formats, that's not an ideal user experience, so I'm fine with whichever way you'd like to do it :)


---