You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by ottobackwards <gi...@git.apache.org> on 2018/06/26 15:05:09 UTC

[GitHub] nifi pull request #2816: NIFI-5337 Syslog 5424 Record Reader and nifi-syslog...

GitHub user ottobackwards opened a pull request:

    https://github.com/apache/nifi/pull/2816

    NIFI-5337 Syslog 5424 Record Reader and nifi-syslog-utils

    - Create nifi-syslog-utils to move syslog parsing functionalty to a central location shared by the processors and serialization/record system.
    - Refactor Processors to use these utils
    - Update 5424 syslog classes using simple-syslog-5424 to pick up new changes to support this work, as well as keep dependencies/types from bleeding out to the
    processors or readers
    - Refactor Syslog5424Event and Parser
    - Create Syslog5424RecordReader
    
    
    
    ### For all changes:
    - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [x] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [x] Have you written or updated unit tests to verify your changes?
    - [-] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [x] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [x] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [-] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [x] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ottobackwards/nifi syslog-readers

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2816.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2816
    
----
commit ebdd59908aee5449b8fd2599a1391cb091e10c04
Author: Otto Fowler <ot...@...>
Date:   2018-06-22T19:44:38Z

    NIFI-5337 Syslog 5424 Record Reader and nifi-syslog-utils
    
    - Create nifi-syslog-utils to move syslog parsing functionalty to a central location shared by the processors and serialization/record system.
    - Refactor Processors to use these utils
    - Update 5424 syslog classes using simple-syslog-5424 to pick up new changes to support this work, as well as keep dependencies/types from bleeding out to the
    processors or readers
    - Refactor Syslog5424Event and Parser
    - Create Syslog5424RecordReader

----


---

[GitHub] nifi pull request #2816: NIFI-5337 Syslog 5424 Record Reader and nifi-syslog...

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2816#discussion_r202103018
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.syslog;
    +
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.syslog.attributes.SyslogAttributes;
    +import org.apache.nifi.syslog.events.Syslog5424Event;
    +import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.nio.ByteBuffer;
    +import java.sql.Timestamp;
    +import java.time.Instant;
    +import java.time.format.DateTimeFormatter;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +public class Syslog5424RecordReader implements RecordReader {
    +    private final BufferedReader reader;
    +    private RecordSchema schema;
    +    private final StrictSyslog5424Parser parser;
    +
    +    public Syslog5424RecordReader(StrictSyslog5424Parser parser, InputStream in, RecordSchema schema){
    +        this.reader = new BufferedReader(new InputStreamReader(in));
    +        this.schema = schema;
    +        this.parser = parser;
    +    }
    +
    +    @Override
    +    public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException {
    +        String line = reader.readLine();
    +
    +        if ( line == null || StringUtils.isBlank(line)) {
    --- End diff --
    
    OK, that makes sense


---

[GitHub] nifi pull request #2816: NIFI-5337 Syslog 5424 Record Reader and nifi-syslog...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/2816


---

[GitHub] nifi pull request #2816: NIFI-5337 Syslog 5424 Record Reader and nifi-syslog...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2816#discussion_r202165722
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java ---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.syslog;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaAccessStrategy;
    +import org.apache.nifi.schema.access.SchemaField;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.syslog.attributes.Syslog5424Attributes;
    +import org.apache.nifi.syslog.attributes.SyslogAttributes;
    +import org.apache.nifi.syslog.keyproviders.SimpleKeyProvider;
    +import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
    +import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
    +import org.apache.nifi.syslog.utils.NilHandlingPolicy;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.EnumSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"syslog 5424", "syslog", "logs", "logfiles", "parse", "text", "record", "reader"})
    +@CapabilityDescription("Provides a mechanism for reading RFC 5424 compliant Syslog data, such as log files, and structuring the data" +
    +        " so that it can be processed.")
    +public class Syslog5424Reader extends SchemaRegistryService implements RecordReaderFactory {
    +    static final AllowableValue RFC_5424_SCHEMA = new AllowableValue("default-5424-schema", "Use RFC 5424 Schema",
    +            "The schema will be the default schema per RFC 5424.");
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("Character Set")
    +            .description("Specifies which character set of the Syslog messages")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    private volatile StrictSyslog5424Parser parser;
    +    private volatile RecordSchema recordSchema;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>(1);
    +        properties.add(CHARSET);
    +        return properties;
    +    }
    +
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) {
    +        final String charsetName = context.getProperty(CHARSET).getValue();
    +        parser = new StrictSyslog5424Parser(Charset.forName(charsetName), NilHandlingPolicy.NULL, NifiStructuredDataPolicy.MAP_OF_MAPS, new SimpleKeyProvider());
    +        recordSchema = createRecordSchema();
    +    }
    +
    +    @Override
    +    protected List<AllowableValue> getSchemaAccessStrategyValues() {
    +        final List<AllowableValue> allowableValues = new ArrayList<>();
    +        allowableValues.add(RFC_5424_SCHEMA);
    +        return allowableValues;
    +    }
    +
    +    @Override
    +    protected AllowableValue getDefaultSchemaAccessStrategy() {
    +        return RFC_5424_SCHEMA;
    +    }
    +
    +    @Override
    +    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
    +        return createAccessStrategy();
    +    }
    +
    +    @Override
    +    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ValidationContext context) {
    +        return createAccessStrategy();
    +    }
    +
    +    static RecordSchema createRecordSchema() {
    +        final List<RecordField> fields = new ArrayList<>();
    +        fields.add(new RecordField(SyslogAttributes.PRIORITY.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.SEVERITY.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.FACILITY.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.VERSION.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.TIMESTAMP.key(), RecordFieldType.TIMESTAMP.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.HOSTNAME.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.BODY.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(Syslog5424Attributes.APP_NAME.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(Syslog5424Attributes.PROCID.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(Syslog5424Attributes.MESSAGEID.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(Syslog5424Attributes.STRUCTURED_BASE.key(),
    +                RecordFieldType.MAP.getMapDataType(RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()))));
    +
    +        final RecordSchema schema = new SimpleRecordSchema(fields);
    --- End diff --
    
    I know the Grok reader also doesn't set a name, but since most writers will have the default "Schema Write Strategy" as "Set schema.name attribute", I think setting a default name here might be helpful, maybe something like "nifi-syslog-5424".
    
    My first attempt at testing this I created a ConvertRecord processor with a syslog reader and JSON writer using mostly defaults, and I got the error about the writer being unable to write the schema.name attribute because the schema has no name. Once I changed the writer to "Do Not Write Schema" then everything was fine, but I'm figuring some users may not realize what needs to be done.


---

[GitHub] nifi pull request #2816: NIFI-5337 Syslog 5424 Record Reader and nifi-syslog...

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2816#discussion_r202105880
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.syslog;
    +
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.syslog.attributes.SyslogAttributes;
    +import org.apache.nifi.syslog.events.Syslog5424Event;
    +import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.nio.ByteBuffer;
    +import java.sql.Timestamp;
    +import java.time.Instant;
    +import java.time.format.DateTimeFormatter;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +public class Syslog5424RecordReader implements RecordReader {
    +    private final BufferedReader reader;
    +    private RecordSchema schema;
    +    private final StrictSyslog5424Parser parser;
    +
    +    public Syslog5424RecordReader(StrictSyslog5424Parser parser, InputStream in, RecordSchema schema){
    +        this.reader = new BufferedReader(new InputStreamReader(in));
    +        this.schema = schema;
    +        this.parser = parser;
    +    }
    +
    +    @Override
    +    public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException {
    +        String line = reader.readLine();
    +
    +        if ( line == null || StringUtils.isBlank(line)) {
    --- End diff --
    
    Ok, looked into this more.  The return null behavior was modeled on grok handling, which does the same.   I think this is the right thing to do, but I *could* see trying to deadline() until EOF or not null


---

[GitHub] nifi issue #2816: NIFI-5337 Syslog 5424 Record Reader and nifi-syslog-utils

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/2816
  
    Everything looks good, going to merge shortly, thanks!


---

[GitHub] nifi pull request #2816: NIFI-5337 Syslog 5424 Record Reader and nifi-syslog...

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2816#discussion_r202107396
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.syslog;
    +
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.syslog.attributes.SyslogAttributes;
    +import org.apache.nifi.syslog.events.Syslog5424Event;
    +import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.nio.ByteBuffer;
    +import java.sql.Timestamp;
    +import java.time.Instant;
    +import java.time.format.DateTimeFormatter;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +public class Syslog5424RecordReader implements RecordReader {
    +    private final BufferedReader reader;
    +    private RecordSchema schema;
    +    private final StrictSyslog5424Parser parser;
    +
    +    public Syslog5424RecordReader(StrictSyslog5424Parser parser, InputStream in, RecordSchema schema){
    +        this.reader = new BufferedReader(new InputStreamReader(in));
    +        this.schema = schema;
    +        this.parser = parser;
    +    }
    +
    +    @Override
    +    public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException {
    +        String line = reader.readLine();
    +
    +        if ( line == null || StringUtils.isBlank(line)) {
    --- End diff --
    
    OK OK, @bbende, sorry I understand what you actually meant, and have made changes accordingly


---

[GitHub] nifi issue #2816: NIFI-5337 Syslog 5424 Record Reader and nifi-syslog-utils

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2816
  
    Thanks @bbende, I haven't done any Record work, so I expect some of it needs refinement


---

[GitHub] nifi pull request #2816: NIFI-5337 Syslog 5424 Record Reader and nifi-syslog...

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2816#discussion_r202731968
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java ---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.syslog;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaAccessStrategy;
    +import org.apache.nifi.schema.access.SchemaField;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.syslog.attributes.Syslog5424Attributes;
    +import org.apache.nifi.syslog.attributes.SyslogAttributes;
    +import org.apache.nifi.syslog.keyproviders.SimpleKeyProvider;
    +import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
    +import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
    +import org.apache.nifi.syslog.utils.NilHandlingPolicy;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.EnumSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"syslog 5424", "syslog", "logs", "logfiles", "parse", "text", "record", "reader"})
    +@CapabilityDescription("Provides a mechanism for reading RFC 5424 compliant Syslog data, such as log files, and structuring the data" +
    +        " so that it can be processed.")
    +public class Syslog5424Reader extends SchemaRegistryService implements RecordReaderFactory {
    +    static final AllowableValue RFC_5424_SCHEMA = new AllowableValue("default-5424-schema", "Use RFC 5424 Schema",
    +            "The schema will be the default schema per RFC 5424.");
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("Character Set")
    +            .description("Specifies which character set of the Syslog messages")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    private volatile StrictSyslog5424Parser parser;
    +    private volatile RecordSchema recordSchema;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>(1);
    +        properties.add(CHARSET);
    +        return properties;
    +    }
    +
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) {
    +        final String charsetName = context.getProperty(CHARSET).getValue();
    +        parser = new StrictSyslog5424Parser(Charset.forName(charsetName), NilHandlingPolicy.NULL, NifiStructuredDataPolicy.MAP_OF_MAPS, new SimpleKeyProvider());
    +        recordSchema = createRecordSchema();
    +    }
    +
    +    @Override
    +    protected List<AllowableValue> getSchemaAccessStrategyValues() {
    +        final List<AllowableValue> allowableValues = new ArrayList<>();
    +        allowableValues.add(RFC_5424_SCHEMA);
    +        return allowableValues;
    +    }
    +
    +    @Override
    +    protected AllowableValue getDefaultSchemaAccessStrategy() {
    +        return RFC_5424_SCHEMA;
    +    }
    +
    +    @Override
    +    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
    +        return createAccessStrategy();
    +    }
    +
    +    @Override
    +    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ValidationContext context) {
    +        return createAccessStrategy();
    +    }
    +
    +    static RecordSchema createRecordSchema() {
    +        final List<RecordField> fields = new ArrayList<>();
    +        fields.add(new RecordField(SyslogAttributes.PRIORITY.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.SEVERITY.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.FACILITY.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.VERSION.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.TIMESTAMP.key(), RecordFieldType.TIMESTAMP.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.HOSTNAME.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.BODY.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(Syslog5424Attributes.APP_NAME.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(Syslog5424Attributes.PROCID.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(Syslog5424Attributes.MESSAGEID.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(Syslog5424Attributes.STRUCTURED_BASE.key(),
    +                RecordFieldType.MAP.getMapDataType(RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()))));
    +
    +        final RecordSchema schema = new SimpleRecordSchema(fields);
    --- End diff --
    
    Ok, done, I think :)



---

[GitHub] nifi issue #2816: NIFI-5337 Syslog 5424 Record Reader and nifi-syslog-utils

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/2816
  
    Started reviewing this yesterday, but may take me some time to finish up, preliminary review looks good though


---

[GitHub] nifi pull request #2816: NIFI-5337 Syslog 5424 Record Reader and nifi-syslog...

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2816#discussion_r202103469
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.syslog;
    +
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.syslog.attributes.SyslogAttributes;
    +import org.apache.nifi.syslog.events.Syslog5424Event;
    +import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.nio.ByteBuffer;
    +import java.sql.Timestamp;
    +import java.time.Instant;
    +import java.time.format.DateTimeFormatter;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +public class Syslog5424RecordReader implements RecordReader {
    +    private final BufferedReader reader;
    +    private RecordSchema schema;
    +    private final StrictSyslog5424Parser parser;
    +
    +    public Syslog5424RecordReader(StrictSyslog5424Parser parser, InputStream in, RecordSchema schema){
    +        this.reader = new BufferedReader(new InputStreamReader(in));
    +        this.schema = schema;
    +        this.parser = parser;
    +    }
    +
    +    @Override
    +    public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException {
    +        String line = reader.readLine();
    +
    +        if ( line == null || StringUtils.isBlank(line)) {
    --- End diff --
    
    Although, I wonder if we could instead read until we git a non-blank line?  In other words skip blanks?


---

[GitHub] nifi pull request #2816: NIFI-5337 Syslog 5424 Record Reader and nifi-syslog...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2816#discussion_r202687489
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java ---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.syslog;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaAccessStrategy;
    +import org.apache.nifi.schema.access.SchemaField;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.syslog.attributes.Syslog5424Attributes;
    +import org.apache.nifi.syslog.attributes.SyslogAttributes;
    +import org.apache.nifi.syslog.keyproviders.SimpleKeyProvider;
    +import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
    +import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
    +import org.apache.nifi.syslog.utils.NilHandlingPolicy;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.EnumSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"syslog 5424", "syslog", "logs", "logfiles", "parse", "text", "record", "reader"})
    +@CapabilityDescription("Provides a mechanism for reading RFC 5424 compliant Syslog data, such as log files, and structuring the data" +
    +        " so that it can be processed.")
    +public class Syslog5424Reader extends SchemaRegistryService implements RecordReaderFactory {
    +    static final AllowableValue RFC_5424_SCHEMA = new AllowableValue("default-5424-schema", "Use RFC 5424 Schema",
    +            "The schema will be the default schema per RFC 5424.");
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("Character Set")
    +            .description("Specifies which character set of the Syslog messages")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    private volatile StrictSyslog5424Parser parser;
    +    private volatile RecordSchema recordSchema;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>(1);
    +        properties.add(CHARSET);
    +        return properties;
    +    }
    +
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) {
    +        final String charsetName = context.getProperty(CHARSET).getValue();
    +        parser = new StrictSyslog5424Parser(Charset.forName(charsetName), NilHandlingPolicy.NULL, NifiStructuredDataPolicy.MAP_OF_MAPS, new SimpleKeyProvider());
    +        recordSchema = createRecordSchema();
    +    }
    +
    +    @Override
    +    protected List<AllowableValue> getSchemaAccessStrategyValues() {
    +        final List<AllowableValue> allowableValues = new ArrayList<>();
    +        allowableValues.add(RFC_5424_SCHEMA);
    +        return allowableValues;
    +    }
    +
    +    @Override
    +    protected AllowableValue getDefaultSchemaAccessStrategy() {
    +        return RFC_5424_SCHEMA;
    +    }
    +
    +    @Override
    +    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
    +        return createAccessStrategy();
    +    }
    +
    +    @Override
    +    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ValidationContext context) {
    +        return createAccessStrategy();
    +    }
    +
    +    static RecordSchema createRecordSchema() {
    +        final List<RecordField> fields = new ArrayList<>();
    +        fields.add(new RecordField(SyslogAttributes.PRIORITY.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.SEVERITY.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.FACILITY.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.VERSION.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.TIMESTAMP.key(), RecordFieldType.TIMESTAMP.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.HOSTNAME.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.BODY.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(Syslog5424Attributes.APP_NAME.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(Syslog5424Attributes.PROCID.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(Syslog5424Attributes.MESSAGEID.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(Syslog5424Attributes.STRUCTURED_BASE.key(),
    +                RecordFieldType.MAP.getMapDataType(RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()))));
    +
    +        final RecordSchema schema = new SimpleRecordSchema(fields);
    --- End diff --
    
    Sorry for the delay, something like...
    
    SchemaIdentifier schemaIdentifier = StandardSchemaIdentifier.Builder.name("my-name").build();
    
    RecordSchema schema = new SimpleRecordSchema(fields, schemaIdentifier)


---

[GitHub] nifi pull request #2816: NIFI-5337 Syslog 5424 Record Reader and nifi-syslog...

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2816#discussion_r202168428
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java ---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.syslog;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaAccessStrategy;
    +import org.apache.nifi.schema.access.SchemaField;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.syslog.attributes.Syslog5424Attributes;
    +import org.apache.nifi.syslog.attributes.SyslogAttributes;
    +import org.apache.nifi.syslog.keyproviders.SimpleKeyProvider;
    +import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
    +import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
    +import org.apache.nifi.syslog.utils.NilHandlingPolicy;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.EnumSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"syslog 5424", "syslog", "logs", "logfiles", "parse", "text", "record", "reader"})
    +@CapabilityDescription("Provides a mechanism for reading RFC 5424 compliant Syslog data, such as log files, and structuring the data" +
    +        " so that it can be processed.")
    +public class Syslog5424Reader extends SchemaRegistryService implements RecordReaderFactory {
    +    static final AllowableValue RFC_5424_SCHEMA = new AllowableValue("default-5424-schema", "Use RFC 5424 Schema",
    +            "The schema will be the default schema per RFC 5424.");
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("Character Set")
    +            .description("Specifies which character set of the Syslog messages")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    private volatile StrictSyslog5424Parser parser;
    +    private volatile RecordSchema recordSchema;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>(1);
    +        properties.add(CHARSET);
    +        return properties;
    +    }
    +
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) {
    +        final String charsetName = context.getProperty(CHARSET).getValue();
    +        parser = new StrictSyslog5424Parser(Charset.forName(charsetName), NilHandlingPolicy.NULL, NifiStructuredDataPolicy.MAP_OF_MAPS, new SimpleKeyProvider());
    +        recordSchema = createRecordSchema();
    +    }
    +
    +    @Override
    +    protected List<AllowableValue> getSchemaAccessStrategyValues() {
    +        final List<AllowableValue> allowableValues = new ArrayList<>();
    +        allowableValues.add(RFC_5424_SCHEMA);
    +        return allowableValues;
    +    }
    +
    +    @Override
    +    protected AllowableValue getDefaultSchemaAccessStrategy() {
    +        return RFC_5424_SCHEMA;
    +    }
    +
    +    @Override
    +    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
    +        return createAccessStrategy();
    +    }
    +
    +    @Override
    +    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ValidationContext context) {
    +        return createAccessStrategy();
    +    }
    +
    +    static RecordSchema createRecordSchema() {
    +        final List<RecordField> fields = new ArrayList<>();
    +        fields.add(new RecordField(SyslogAttributes.PRIORITY.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.SEVERITY.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.FACILITY.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.VERSION.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.TIMESTAMP.key(), RecordFieldType.TIMESTAMP.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.HOSTNAME.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(SyslogAttributes.BODY.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(Syslog5424Attributes.APP_NAME.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(Syslog5424Attributes.PROCID.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(Syslog5424Attributes.MESSAGEID.key(), RecordFieldType.STRING.getDataType(), true));
    +        fields.add(new RecordField(Syslog5424Attributes.STRUCTURED_BASE.key(),
    +                RecordFieldType.MAP.getMapDataType(RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()))));
    +
    +        final RecordSchema schema = new SimpleRecordSchema(fields);
    --- End diff --
    
    What is the right way to set the name?


---

[GitHub] nifi pull request #2816: NIFI-5337 Syslog 5424 Record Reader and nifi-syslog...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2816#discussion_r202109378
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.syslog;
    +
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.syslog.attributes.SyslogAttributes;
    +import org.apache.nifi.syslog.events.Syslog5424Event;
    +import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.nio.ByteBuffer;
    +import java.sql.Timestamp;
    +import java.time.Instant;
    +import java.time.format.DateTimeFormatter;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +public class Syslog5424RecordReader implements RecordReader {
    +    private final BufferedReader reader;
    +    private RecordSchema schema;
    +    private final StrictSyslog5424Parser parser;
    +
    +    public Syslog5424RecordReader(StrictSyslog5424Parser parser, InputStream in, RecordSchema schema){
    +        this.reader = new BufferedReader(new InputStreamReader(in));
    +        this.schema = schema;
    +        this.parser = parser;
    +    }
    +
    +    @Override
    +    public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException {
    +        String line = reader.readLine();
    +
    +        if ( line == null || StringUtils.isBlank(line)) {
    --- End diff --
    
    Yea I also considered the idea of reading until a non-blank line or null, but then I thought about the case of something like ListenTCPRecord where the InputStream could be an unbounded stream over a socket, and you could potentially end up stuck inside the nextRecord method reading blank lines forever (unlikely of course).


---

[GitHub] nifi issue #2816: NIFI-5337 Syslog 5424 Record Reader and nifi-syslog-utils

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2816
  
    @bbende any chance for a review?


---