You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by ryanpersaud <gi...@git.apache.org> on 2016/05/20 19:35:16 UTC

[GitHub] nifi pull request: NIFI-1909 Adding ability to process schemaless ...

GitHub user ryanpersaud opened a pull request:

    https://github.com/apache/nifi/pull/459

    NIFI-1909 Adding ability to process schemaless Avro records to

    NIFI-1909 Adding ability to process schemaless Avro records to ConvertAvroToJson.  A new property named schema is added to the processor.  If a value is provided for this property, then it is assumed that incoming Avro records are schemaless and the value of the schema property is used when parsing incoming records.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ryanpersaud/nifi nifi-1909

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/459.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #459
    
----
commit dd03d6c573820ba6f8e356c288c0e25dc11f1dd0
Author: Ryan Persaud <ry...@gmail.com>
Date:   2016-05-20T19:27:39Z

    NIFI-1909 Adding ability to process schemaless Avro records to ConvertAvrotoJson.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #459: NIFI-1909 Adding ability to process schemaless Avro ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/459


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1909 Adding ability to process schemaless ...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/459#discussion_r64097258
  
    --- Diff: nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java ---
    @@ -92,6 +103,7 @@
             .build();
     
         private List<PropertyDescriptor> properties;
    +    private Schema schema = null;
    --- End diff --
    
    This could be marked by multiple different threads concurrently, so we need to make sure that it is protected. Marking it as 'volatile' should take care of this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #459: NIFI-1909 Adding ability to process schemaless Avro records...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on the issue:

    https://github.com/apache/nifi/pull/459
  
    @ryanpersaud thanks for getting this in! I have verified the checkstyle and pushed to both the 0.x and 1.0.0 baselines. Thanks again for knocking this out!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1909 Adding ability to process schemaless ...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/459#discussion_r64097658
  
    --- Diff: nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java ---
    @@ -128,49 +141,77 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
             // Wrap a single record (inclusive of no records) only when a container is being used
             final boolean wrapSingleRecord = context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer;
     
    +        final String stringSchema = context.getProperty(SCHEMA).getValue();
    +        final boolean schemaLess = stringSchema != null;
    +
             try {
                 flowFile = session.write(flowFile, new StreamCallback() {
                     @Override
                     public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException {
    -                    try (final InputStream in = new BufferedInputStream(rawIn);
    -                         final OutputStream out = new BufferedOutputStream(rawOut);
    -                         final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
    +                    if (schemaLess) {
    +                        if (schema == null) {
    +                            schema = new Schema.Parser().parse(stringSchema);
    +                        }
    +                        try (final InputStream in = new BufferedInputStream(rawIn);
    +                             final OutputStream out = new BufferedOutputStream(rawOut)) {
    +                            final DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
    +                            final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);
    +                            final GenericRecord record = reader.read(null, decoder);
    +
    +                            // Schemaless records are singletons, so both useContainer and wrapSingleRecord
    +                            // need to be true before we wrap it with an array
    +                            if (useContainer && wrapSingleRecord) {
    +                                out.write('[');
    +                            }
     
    -                        final GenericData genericData = GenericData.get();
    +                            final byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT : record.toString().getBytes(StandardCharsets.UTF_8);
    --- End diff --
    
    I think we need to keep the use of GenericData here. While record.toString() does in fact convert the Avro object to JSON, the toString() method is not documented as doing so and could change at any time. The GenericData object is documented to convert the object into JSON.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1909 Adding ability to process schemaless ...

Posted by ryanpersaud <gi...@git.apache.org>.
Github user ryanpersaud commented on the pull request:

    https://github.com/apache/nifi/pull/459#issuecomment-220753130
  
    I made the changes that @markap14 suggested and removed unused imports that were identified by checkstyle.  I've attempted to run `mvn -Pcontrib-check` on 3 different machines, but I get failures in tests for the nifi-framework-core module that seem unrelated to my modifications.  For example, running the build on a Linux machine, I get the following error:
    
    Running TestSuite
    Tests run: 172, Failures: 2, Errors: 0, Skipped: 1, Time elapsed: 272.879 sec <<< FAILURE! - in TestSuite
    testStateTooLargeExceptionThrownOnReplace on testStateTooLargeExceptionThrownOnReplace(org.apache.nifi.controller.state.providers.zookeeper.TestZooKeeperStateProvider)(org.apache.nifi.controller.state.providers.zookeeper.TestZooKeeperStateProvider)  Time elapsed: 24.341 sec  <<< FAILURE!
    org.junit.runners.model.TestTimedOutException: test timed out after 20000 milliseconds
            at java.lang.Object.wait(Native Method)
            at java.lang.Object.wait(Object.java:502)
            at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342)
            at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1268)
            at org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider.setState(ZooKeeperStateProvider.java:318)
            at org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider.setState(ZooKeeperStateProvider.java:283)
            at org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider.setState(ZooKeeperStateProvider.java:228)
            at org.apache.nifi.controller.state.providers.zookeeper.TestZooKeeperStateProvider.testStateTooLargeExceptionThrownOnReplace(TestZooKeeperStateProvider.java:176)
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---