You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/04/06 20:00:02 UTC

[GitHub] [nifi] Lehel44 opened a new pull request, #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array

Lehel44 opened a new pull request, #5937:
URL: https://github.com/apache/nifi/pull/5937

   <!--
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
         http://www.apache.org/licenses/LICENSE-2.0
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
   -->
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
   #### Description of PR
   
   https://issues.apache.org/jira/browse/NIFI-9862
   
   - Added optional "Skip To Nested JSON Field" property to JsonTreeReader
   - Extended JsonRecordSource with init() method and added logic to jump to the specified field
   - Added new constructor to AbstractJsonRowRecordReader logic to jump to the specified field
   - In AbstractJsonRowRecordReader constructor **I changed the default logic** from:
   (1) "check the next token -> if its an array -> check the next token -> if it's an object -> proceed" to
   (2) "find the first token which is an array or an object -> proceed"
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [ ] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #5937:
URL: https://github.com/apache/nifi/pull/5937#discussion_r846170727


##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java:
##########
@@ -56,6 +59,30 @@
 
 public class TestWriteJsonResult {
 
+    @Test
+    void test() throws IOException {
+        String json = "{\"name\":\"Tom\",\"age\":25,\"address\":[\"Poland\",\"5th avenue\"]}";
+
+        JsonFactory jfactory = new JsonFactory();
+        JsonParser jParser = jfactory.createParser(json);
+
+        Integer parsedAge = null;
+
+        while (jParser.nextToken() != JsonToken.END_OBJECT) {
+            String fieldname = jParser.getCurrentName();
+
+            if ("age".equals(fieldname)) {
+                jParser.nextToken();
+                parsedAge = jParser.getIntValue();
+                break;
+            }
+
+        }
+        jParser.close();
+
+        System.out.println(parsedAge);

Review Comment:
   This test remained there accidentally. I removed it and removed the public modifiers also for consistency with the other test classes in the package.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on pull request #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on PR #5937:
URL: https://github.com/apache/nifi/pull/5937#issuecomment-1099230066

   Thank you for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #5937:
URL: https://github.com/apache/nifi/pull/5937#discussion_r846263631


##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java:
##########
@@ -66,6 +67,17 @@
     private volatile String dateFormat;
     private volatile String timeFormat;
     private volatile String timestampFormat;
+    private volatile String skipToNestedJsonField;
+
+    public static final PropertyDescriptor SKIP_TO_NESTED_JSON_FIELD = new PropertyDescriptor.Builder()
+            .name("skip-to-nested-json-field")
+            .displayName("Skip To Nested JSON Field")
+            .description("Skips forward to the given nested JSON field (array or object) and begins processing there. " +
+                    "If the field is not nested, the processing will proceed from the next available nested field.")

Review Comment:
   I removed this part because the behavior got reverted to the original one, so it'll return an empty json in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #5937:
URL: https://github.com/apache/nifi/pull/5937#discussion_r845222070


##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -61,11 +60,11 @@
 
     private static final JsonFactory jsonFactory = new JsonFactory();
     private static final ObjectMapper codec = new ObjectMapper();
+    private JsonParser jsonParser;
+    private JsonNode firstJsonNode;

Review Comment:
   Is there a reason these are no longer marked as `final`?



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -95,6 +100,41 @@ public AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logg
         }
     }
 
+    protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat,
+                                          final String skipToNestedJsonField) throws IOException, MalformedRecordException {

Review Comment:
   Minor naming suggestion, since the class context is already JSON, `Json` is not necessary in the variable name.
   ```suggestion
                                             final String nestedFieldName) throws IOException, MalformedRecordException {
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java:
##########
@@ -19,37 +19,59 @@
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.SerializedString;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.schema.inference.RecordSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 
 public class JsonRecordSource implements RecordSource<JsonNode> {
+    private static final Logger logger = LoggerFactory.getLogger(JsonRecordSource.class);
     private static final JsonFactory jsonFactory;
     private final JsonParser jsonParser;
+    private final String skipToNestedJsonField;
 
     static {
         jsonFactory = new JsonFactory();
         jsonFactory.setCodec(new ObjectMapper());
     }
 
-    public JsonRecordSource(final InputStream in) throws IOException {
-        jsonParser = jsonFactory.createJsonParser(in);
+    public JsonRecordSource(final InputStream in, final String skipToNestedJsonField) throws IOException {
+        jsonParser = jsonFactory.createParser(in);
+        this.skipToNestedJsonField = skipToNestedJsonField;
+    }
+
+    @Override
+    public void init() throws IOException {
+        if (skipToNestedJsonField != null) {
+            while (!jsonParser.nextFieldName(new SerializedString(skipToNestedJsonField))) {

Review Comment:
   As mentioned in the RecordReader, `SerializedString` should be declared and reused.



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -95,6 +100,41 @@ public AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logg
         }
     }
 
+    protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat,
+                                          final String skipToNestedJsonField) throws IOException, MalformedRecordException {
+
+        this(logger, dateFormat, timeFormat, timestampFormat);
+
+        try {
+            jsonParser = jsonFactory.createParser(in);
+            jsonParser.setCodec(codec);
+
+            if (skipToNestedJsonField != null) {
+                while (!jsonParser.nextFieldName(new SerializedString(skipToNestedJsonField))) {
+                    // go to nested field if specified
+                    if (!jsonParser.hasCurrentToken()) {
+                        throw new IOException("The defined skipTo json field is not found when processing json as NiFi record.");
+                    }
+                }
+                logger.debug("Skipped to specified json field [{}] when processing json as NiFI record.", skipToNestedJsonField);
+            }
+
+            JsonToken token = jsonParser.nextToken();
+            if (skipToNestedJsonField != null && !jsonParser.isExpectedStartArrayToken() && token != JsonToken.START_OBJECT) {
+                logger.debug("Specified json field [{}] to skip to is not found. Schema infer will start from the next nested json object or array.", skipToNestedJsonField);
+            }

Review Comment:
   This check and debug log seems unnecessary, since the previous debug log already indicates where the parser is starting.



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/RecordSource.java:
##########
@@ -18,7 +18,10 @@
 
 import java.io.IOException;
 
-@FunctionalInterface
 public interface RecordSource<T> {
+
+    default void init() throws IOException {
+    }

Review Comment:
   Recommend removing this change and implementing the logic in the constructor, similar to the RecordReader.



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java:
##########
@@ -1062,38 +1026,197 @@ public void testChoseSuboptimalSchemaWhenDataHasExtraFields() throws Exception {
         testReadRecords(jsonPath, schema, expected);
     }
 
+    @Test
+    void testSkipToNestedArray() throws IOException, MalformedRecordException {
+        String jsonPath = "src/test/resources/json/single-element-nested-array.json";
+
+        SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("id", RecordFieldType.INT.getDataType()),
+                new RecordField("balance", RecordFieldType.DOUBLE.getDataType())
+        ));
+
+        List<Object> expected = Arrays.asList(
+                new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+                    put("id", 42);
+                    put("balance", 4750.89);
+                }}),
+                new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+                    put("id", 43);
+                    put("balance", 48212.38);
+                }})
+        );
+
+        testReadRecords(jsonPath, expected, "accounts");
+    }
+
+    @Test
+    void testSkipToNestedObject() throws IOException, MalformedRecordException {
+        String jsonPath = "src/test/resources/json/single-element-nested.json";
+
+        SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("id", RecordFieldType.INT.getDataType()),
+                new RecordField("balance", RecordFieldType.DOUBLE.getDataType())
+        ));
+
+        List<Object> expected = Collections.singletonList(
+                new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{
+                    put("id", 42);
+                    put("balance", 4750.89);
+                }})
+        );
+
+        testReadRecords(jsonPath, expected, "account");
+    }
+
+    @Test
+    void testSkipToMultipleNestedField() throws IOException, MalformedRecordException {
+        String jsonPath = "src/test/resources/json/multiple-nested-field.json";
+
+        SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("id", RecordFieldType.STRING.getDataType()),
+                new RecordField("type", RecordFieldType.STRING.getDataType())
+        ));
+
+        List<Object> expected = Arrays.asList(
+                    new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+                        put("id", "n312kj3");
+                        put("type", "employee");
+                    }}),
+                    new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+                        put("id", "dl2kdff");
+                        put("type", "security");
+                    }})
+        );
+
+        testReadRecords(jsonPath, expected, "accountIds");
+    }
+
+    @Test
+    void testSkipToSimpleFieldFindsNextNestedArray() throws IOException, MalformedRecordException {
+        String jsonPath = "src/test/resources/json/single-element-nested-array.json";
+
+        SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("id", RecordFieldType.INT.getDataType()),
+                new RecordField("balance", RecordFieldType.DOUBLE.getDataType())
+        ));
+
+        List<Object> expected = Arrays.asList(
+                new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+                    put("id", 42);
+                    put("balance", 4750.89);
+                }}),
+                new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+                    put("id", 43);
+                    put("balance", 48212.38);
+                }})
+        );
+
+        testReadRecords(jsonPath, expected, "name");
+    }
+
+    @Test
+    void testSkipToSimpleFieldFindsNextNestedObject() throws IOException, MalformedRecordException {
+        String jsonPath = "src/test/resources/json/single-element-nested.json";
+
+        SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("id", RecordFieldType.INT.getDataType()),
+                new RecordField("balance", RecordFieldType.DOUBLE.getDataType())
+        ));
+
+        List<Object> expected = Collections.singletonList(
+                new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{
+                    put("id", 42);
+                    put("balance", 4750.89);
+                }})
+        );
+
+        testReadRecords(jsonPath, expected, "name");
+    }
+
+    @Test
+    void testSkipToSimpleFieldAndNoNestedObjectOrArrayFound() throws IOException, MalformedRecordException {
+        String jsonPath = "src/test/resources/json/single-element-nested-array-middle.json";
+
+        List<Object> expected = Collections.emptyList();
+
+        testReadRecords(jsonPath, expected, "name");
+    }
+
+    @Test
+    void testSkipToNonExistentFieldWithDefinedSchema() throws IOException, MalformedRecordException {
+        String jsonPath = "src/test/resources/json/single-element-nested-array-middle.json";
+
+        SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(getDefaultFields());
+        List<Object> expected = Collections.emptyList();
+
+        Exception exception = assertThrows(IOException.class,
+                () -> testReadRecords(jsonPath, expectedRecordSchema, expected, "notfound")
+        );
+
+        assertEquals("The defined skipTo json field is not found when processing json as NiFi record.", exception.getMessage());
+        assertEquals(AbstractJsonRowRecordReader.class.getName(), exception.getStackTrace()[0].getClassName());
+    }
+
     private void testReadRecords(String jsonPath, List<Object> expected) throws IOException, MalformedRecordException {
         // GIVEN
         final File jsonFile = new File(jsonPath);
 
         try (
-            InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile));
+            InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))
         ) {
-            RecordSchema schema = inferSchema(jsonStream);
+            RecordSchema schema = inferSchema(jsonStream, null);
 
             // WHEN
             // THEN
             testReadRecords(jsonStream, schema, expected);
         }
     }
 
+    private void testReadRecords(String jsonPath, List<Object> expected, String skipToField) throws IOException, MalformedRecordException {
+        // GIVEN
+        final File jsonFile = new File(jsonPath);
+
+        try (
+                InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))
+        ) {
+            RecordSchema schema = inferSchema(jsonStream, skipToField);
+
+            // WHEN
+            // THEN

Review Comment:
   It is best to avoid introducing the `GIVEN/WHEN/THEN` comments since they do not provide substantive detail on how the code works.



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -95,6 +100,41 @@ public AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logg
         }
     }
 
+    protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat,
+                                          final String skipToNestedJsonField) throws IOException, MalformedRecordException {
+
+        this(logger, dateFormat, timeFormat, timestampFormat);
+
+        try {
+            jsonParser = jsonFactory.createParser(in);
+            jsonParser.setCodec(codec);
+
+            if (skipToNestedJsonField != null) {
+                while (!jsonParser.nextFieldName(new SerializedString(skipToNestedJsonField))) {
+                    // go to nested field if specified
+                    if (!jsonParser.hasCurrentToken()) {
+                        throw new IOException("The defined skipTo json field is not found when processing json as NiFi record.");
+                    }
+                }
+                logger.debug("Skipped to specified json field [{}] when processing json as NiFI record.", skipToNestedJsonField);
+            }
+
+            JsonToken token = jsonParser.nextToken();
+            if (skipToNestedJsonField != null && !jsonParser.isExpectedStartArrayToken() && token != JsonToken.START_OBJECT) {
+                logger.debug("Specified json field [{}] to skip to is not found. Schema infer will start from the next nested json object or array.", skipToNestedJsonField);
+            }
+            while (token != null) {
+                if (token == JsonToken.START_OBJECT) {
+                    firstJsonNode = jsonParser.readValueAsTree();
+                    break;
+                }
+                token = jsonParser.nextToken();
+            }

Review Comment:
   Instead of using a `while` loop, this should follow the same logic as the other constructor, perhaps the implementations could be collapsed into a shared method.
   ```suggestion
               JsonToken token = jsonParser.nextToken();
               if (token == JsonToken.START_ARRAY) {
                   token = jsonParser.nextToken(); // advance to START_OBJECT token
               }
   
               if (token == JsonToken.START_OBJECT) { // could be END_ARRAY also
                   firstJsonNode = jsonParser.readValueAsTree();
               } else {
                   firstJsonNode = null;
               }
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java:
##########
@@ -19,37 +19,59 @@
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.SerializedString;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.schema.inference.RecordSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 
 public class JsonRecordSource implements RecordSource<JsonNode> {
+    private static final Logger logger = LoggerFactory.getLogger(JsonRecordSource.class);
     private static final JsonFactory jsonFactory;
     private final JsonParser jsonParser;
+    private final String skipToNestedJsonField;
 
     static {
         jsonFactory = new JsonFactory();
         jsonFactory.setCodec(new ObjectMapper());
     }
 
-    public JsonRecordSource(final InputStream in) throws IOException {
-        jsonParser = jsonFactory.createJsonParser(in);
+    public JsonRecordSource(final InputStream in, final String skipToNestedJsonField) throws IOException {
+        jsonParser = jsonFactory.createParser(in);
+        this.skipToNestedJsonField = skipToNestedJsonField;
+    }
+
+    @Override
+    public void init() throws IOException {
+        if (skipToNestedJsonField != null) {
+            while (!jsonParser.nextFieldName(new SerializedString(skipToNestedJsonField))) {
+                // go to nested field if specified
+                if (!jsonParser.hasCurrentToken()) {
+                    throw new IOException("The defined skipTo json field is not found when inferring json schema.");
+                }
+            }
+            logger.debug("Skipped to specified json field [{}] while inferring json schema.", skipToNestedJsonField);
+        }
     }
 
     @Override
     public JsonNode next() throws IOException {
+        JsonToken token = jsonParser.nextToken();
+        if (skipToNestedJsonField != null && !jsonParser.isExpectedStartArrayToken() && token != JsonToken.START_OBJECT) {
+            logger.debug("Specified json field [{}] to skip to is not found. Schema infer will start from the next nested json object or array.", skipToNestedJsonField);
+        }

Review Comment:
   As mentioned in the RecordReader, recommend removing this check and debug log.



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java:
##########
@@ -19,37 +19,59 @@
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.SerializedString;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.schema.inference.RecordSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 
 public class JsonRecordSource implements RecordSource<JsonNode> {
+    private static final Logger logger = LoggerFactory.getLogger(JsonRecordSource.class);
     private static final JsonFactory jsonFactory;
     private final JsonParser jsonParser;
+    private final String skipToNestedJsonField;
 
     static {
         jsonFactory = new JsonFactory();
         jsonFactory.setCodec(new ObjectMapper());
     }
 
-    public JsonRecordSource(final InputStream in) throws IOException {
-        jsonParser = jsonFactory.createJsonParser(in);
+    public JsonRecordSource(final InputStream in, final String skipToNestedJsonField) throws IOException {

Review Comment:
   Similar to other usage, recommend renaming the parameter:
   ```suggestion
       public JsonRecordSource(final InputStream in, final String nestedFieldName) throws IOException {
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -95,6 +100,41 @@ public AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logg
         }
     }
 
+    protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat,
+                                          final String skipToNestedJsonField) throws IOException, MalformedRecordException {
+
+        this(logger, dateFormat, timeFormat, timestampFormat);
+
+        try {
+            jsonParser = jsonFactory.createParser(in);
+            jsonParser.setCodec(codec);
+
+            if (skipToNestedJsonField != null) {
+                while (!jsonParser.nextFieldName(new SerializedString(skipToNestedJsonField))) {

Review Comment:
   The `SerializedString` instance should be declared separately to avoid instantiating it on each iteration of the loop.
   ```suggestion
                   final SerializedString nestedFieldName = new SerializedString(skipToNestedJsonField);
                   while (!jsonParser.nextFieldName(nestedFieldName)) {
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -95,6 +100,41 @@ public AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logg
         }
     }
 
+    protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat,
+                                          final String skipToNestedJsonField) throws IOException, MalformedRecordException {
+
+        this(logger, dateFormat, timeFormat, timestampFormat);
+
+        try {
+            jsonParser = jsonFactory.createParser(in);
+            jsonParser.setCodec(codec);
+
+            if (skipToNestedJsonField != null) {
+                while (!jsonParser.nextFieldName(new SerializedString(skipToNestedJsonField))) {
+                    // go to nested field if specified
+                    if (!jsonParser.hasCurrentToken()) {
+                        throw new IOException("The defined skipTo json field is not found when processing json as NiFi record.");
+                    }
+                }
+                logger.debug("Skipped to specified json field [{}] when processing json as NiFI record.", skipToNestedJsonField);

Review Comment:
   Recommend adjusting the wording:
   ```suggestion
                   logger.debug("Parsing starting at nested field [{}]", skipToNestedJsonField);
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -95,6 +100,41 @@ public AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logg
         }
     }
 
+    protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat,
+                                          final String skipToNestedJsonField) throws IOException, MalformedRecordException {
+
+        this(logger, dateFormat, timeFormat, timestampFormat);
+
+        try {
+            jsonParser = jsonFactory.createParser(in);
+            jsonParser.setCodec(codec);
+
+            if (skipToNestedJsonField != null) {
+                while (!jsonParser.nextFieldName(new SerializedString(skipToNestedJsonField))) {
+                    // go to nested field if specified
+                    if (!jsonParser.hasCurrentToken()) {
+                        throw new IOException("The defined skipTo json field is not found when processing json as NiFi record.");
+                    }

Review Comment:
   Is this check necessary inside the loop? It seems like the check should be moved outside the loop, or it could be removed completely since `jsonParser.nextToken()` later on checks that the token is not `null`.



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java:
##########
@@ -41,32 +44,32 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class TestInferJsonSchemaAccessStrategy {
-    private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
-    private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
-    private final String timestampFormat = "yyyy-MM-DD'T'HH:mm:ss.SSS'Z'";
+class TestInferJsonSchemaAccessStrategy {

Review Comment:
   What is the reason for removing the `public` modifier on the class and methods? The vast majority of test classes have the public modifier, so recommend reverting the change.



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java:
##########
@@ -19,37 +19,59 @@
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.SerializedString;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.schema.inference.RecordSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 
 public class JsonRecordSource implements RecordSource<JsonNode> {
+    private static final Logger logger = LoggerFactory.getLogger(JsonRecordSource.class);
     private static final JsonFactory jsonFactory;
     private final JsonParser jsonParser;
+    private final String skipToNestedJsonField;
 
     static {
         jsonFactory = new JsonFactory();
         jsonFactory.setCodec(new ObjectMapper());
     }
 
-    public JsonRecordSource(final InputStream in) throws IOException {
-        jsonParser = jsonFactory.createJsonParser(in);
+    public JsonRecordSource(final InputStream in, final String skipToNestedJsonField) throws IOException {
+        jsonParser = jsonFactory.createParser(in);
+        this.skipToNestedJsonField = skipToNestedJsonField;
+    }
+
+    @Override
+    public void init() throws IOException {

Review Comment:
   Is there a reason for implementing the logic here as opposed to in the constructor?



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java:
##########
@@ -66,6 +67,17 @@
     private volatile String dateFormat;
     private volatile String timeFormat;
     private volatile String timestampFormat;
+    private volatile String skipToNestedJsonField;
+
+    public static final PropertyDescriptor SKIP_TO_NESTED_JSON_FIELD = new PropertyDescriptor.Builder()
+            .name("skip-to-nested-json-field")
+            .displayName("Skip To Nested JSON Field")

Review Comment:
   Recommend adjusting the wording to remove `JSON`, since the component already includes `Json` in the class name.
   ```suggestion
               .name("starting-field-name")
               .displayName("Starting Field Name")
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java:
##########
@@ -66,6 +67,17 @@
     private volatile String dateFormat;
     private volatile String timeFormat;
     private volatile String timestampFormat;
+    private volatile String skipToNestedJsonField;
+
+    public static final PropertyDescriptor SKIP_TO_NESTED_JSON_FIELD = new PropertyDescriptor.Builder()
+            .name("skip-to-nested-json-field")
+            .displayName("Skip To Nested JSON Field")
+            .description("Skips forward to the given nested JSON field (array or object) and begins processing there. " +
+                    "If the field is not nested, the processing will proceed from the next available nested field.")

Review Comment:
   This particular sentence is not quite clear. Should it say that if the Starting Field Name is not specified, then processing will begin at the first available element?



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java:
##########
@@ -66,6 +67,17 @@
     private volatile String dateFormat;
     private volatile String timeFormat;
     private volatile String timestampFormat;
+    private volatile String skipToNestedJsonField;
+
+    public static final PropertyDescriptor SKIP_TO_NESTED_JSON_FIELD = new PropertyDescriptor.Builder()
+            .name("skip-to-nested-json-field")
+            .displayName("Skip To Nested JSON Field")
+            .description("Skips forward to the given nested JSON field (array or object) and begins processing there. " +

Review Comment:
   ```suggestion
               .description("Skips forward to the given nested JSON field (array or object) to begin processing. " +
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java:
##########
@@ -189,14 +191,65 @@ public void testDocsExample() throws IOException {
         assertSame(RecordFieldType.STRING, schema.getDataType("nullValue").get().getFieldType());
     }
 
-    private RecordSchema inferSchema(final File file) throws IOException {
+    @ParameterizedTest(name = "{index} {2}")
+    @MethodSource("skipToArgumentProvider")
+    void testInferenceSkipsToArray(final String jsonPath, final String skipToField, String testName) throws IOException {
+        final File file = new File(jsonPath);
+        final RecordSchema schema = inferSchema(file, skipToField);
+
+        final RecordField field1 = schema.getField("id").get();
+        assertSame(RecordFieldType.INT, field1.getDataType().getFieldType());
+
+        final RecordField field2 = schema.getField("balance").get();
+        assertSame(RecordFieldType.DOUBLE, field2.getDataType().getFieldType());
+    }
+
+    @Test
+    void testInferenceSkipsToSimpleFieldAndNoNestedObjectOrArrayFound() throws IOException {
+        final File file = new File("src/test/resources/json/single-element-nested-array-middle.json");
+        final RecordSchema schema = inferSchema(file, "name");
+
+        assertEquals(0, schema.getFieldCount());
+    }
+
+    @Test
+    void testInferenceSkipsToNonExistentField() {
+        final File file = new File("src/test/resources/json/single-element-nested-array.json");
+        final Exception exception = assertThrows(IOException.class,
+                () -> inferSchema(file, "notfound")
+        );
+        assertEquals("The defined skipTo json field is not found when inferring json schema.", exception.getMessage());

Review Comment:
   It is best to avoid assert exact exception messages since it makes tests brittle, and the message should not necessarily be part of the contract.



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java:
##########
@@ -56,6 +59,30 @@
 
 public class TestWriteJsonResult {
 
+    @Test
+    void test() throws IOException {
+        String json = "{\"name\":\"Tom\",\"age\":25,\"address\":[\"Poland\",\"5th avenue\"]}";
+
+        JsonFactory jfactory = new JsonFactory();
+        JsonParser jParser = jfactory.createParser(json);
+
+        Integer parsedAge = null;
+
+        while (jParser.nextToken() != JsonToken.END_OBJECT) {
+            String fieldname = jParser.getCurrentName();
+
+            if ("age".equals(fieldname)) {
+                jParser.nextToken();
+                parsedAge = jParser.getIntValue();
+                break;
+            }
+
+        }
+        jParser.close();
+
+        System.out.println(parsedAge);

Review Comment:
   Calls to `System.out.println()` should be avoided in unit tests.
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #5937:
URL: https://github.com/apache/nifi/pull/5937#discussion_r846134215


##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -61,11 +60,11 @@
 
     private static final JsonFactory jsonFactory = new JsonFactory();
     private static final ObjectMapper codec = new ObjectMapper();
+    private JsonParser jsonParser;
+    private JsonNode firstJsonNode;

Review Comment:
   I was trying to extract the common part from the two original constructor and avoiding method calling from constructor so I created a private constructor which does not initialize jsonParser and firstJsonNode. That's why they're not final. If I extract the initiation of these variable too, the new common constructor will have the same signature as the original ones. Do you have any better idea?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #5937:
URL: https://github.com/apache/nifi/pull/5937#discussion_r848430257


##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java:
##########
@@ -19,37 +19,58 @@
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.SerializedString;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.schema.inference.RecordSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 
 public class JsonRecordSource implements RecordSource<JsonNode> {
+    private static final Logger logger = LoggerFactory.getLogger(JsonRecordSource.class);
     private static final JsonFactory jsonFactory;
     private final JsonParser jsonParser;
+    private final String skipToNestedJsonField;
 
     static {
         jsonFactory = new JsonFactory();
         jsonFactory.setCodec(new ObjectMapper());
     }
 
     public JsonRecordSource(final InputStream in) throws IOException {
-        jsonParser = jsonFactory.createJsonParser(in);
+        jsonParser = jsonFactory.createParser(in);
+        skipToNestedJsonField = null;
+    }
+
+    public JsonRecordSource(final InputStream in, final String nestedFieldName) throws IOException {
+        jsonParser = jsonFactory.createParser(in);
+        this.skipToNestedJsonField = nestedFieldName;
+
+        if (skipToNestedJsonField != null) {
+            final SerializedString serializedNestedField = new SerializedString(skipToNestedJsonField);
+            while (!jsonParser.nextFieldName(serializedNestedField) && jsonParser.hasCurrentToken()) {
+            }
+            logger.debug("Skipped to specified json field [{}] while inferring json schema.", skipToNestedJsonField);
+        }
     }
 
     @Override
     public JsonNode next() throws IOException {
+        JsonToken token = jsonParser.nextToken();
+        if (skipToNestedJsonField != null && !jsonParser.isExpectedStartArrayToken() && token != JsonToken.START_OBJECT) {
+            logger.debug("Specified json field [{}] to skip to is not found. Schema infer will start from the next nested json object or array.", skipToNestedJsonField);
+        }
         while (true) {
-            final JsonToken token = jsonParser.nextToken();
             if (token == null) {
                 return null;
             }
-
             if (token == JsonToken.START_OBJECT) {
                 return jsonParser.readValueAsTree();
             }
+            token = jsonParser.nextToken();

Review Comment:
   That's a good one, of course!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory closed pull request #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array

Posted by GitBox <gi...@apache.org>.
exceptionfactory closed pull request #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array
URL: https://github.com/apache/nifi/pull/5937


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #5937:
URL: https://github.com/apache/nifi/pull/5937#discussion_r849574880


##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java:
##########
@@ -59,32 +61,62 @@
         + "If an array is encountered, each element in that array will be treated as a separate record. "
         + "If the schema that is configured contains a field that is not present in the JSON, a null value will be used. If the JSON contains "
         + "a field that is not present in the schema, that field will be skipped. "
-    + "See the Usage of the Controller Service for more information and examples.")
+        + "See the Usage of the Controller Service for more information and examples.")
 @SeeAlso(JsonPathReader.class)
 public class JsonTreeReader extends SchemaRegistryService implements RecordReaderFactory {
 
     private volatile String dateFormat;
     private volatile String timeFormat;
     private volatile String timestampFormat;
+    private volatile String startingFieldName;
+    private volatile StartingFieldStrategy startingFieldStrategy;
+
+    public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new PropertyDescriptor.Builder()
+            .name("starting-field-strategy")
+            .displayName("Starting Field Strategy")
+            .description("Start processing from the root node or from a specified nested node.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue(StartingFieldStrategy.ROOT_NODE.name())
+            .allowableValues(
+                    Arrays.stream(StartingFieldStrategy.values()).map(startingStrategy ->
+                            new AllowableValue(startingStrategy.name(), startingStrategy.name(), startingStrategy.getDescription())

Review Comment:
   Recommend adding a `getDisplayName()` and `StartingFieldStrategy` and referencing it when constructing the AllowableValue to make it more readable:
   ```suggestion
                               new AllowableValue(startingStrategy.name(), startingStrategy.getDisplayName(), startingStrategy.getDescription())
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/StartingFieldStrategy.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.json;
+
+public enum StartingFieldStrategy {
+    ROOT_NODE("Begins processing from the root node."),

Review Comment:
   Recommend adding a display name:
   ```suggestion
       ROOT_NODE("Root Node", "Begins processing from the root node"),
   
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/StartingFieldStrategy.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.json;
+
+public enum StartingFieldStrategy {
+    ROOT_NODE("Begins processing from the root node."),
+    NESTED_NODE("Skips forward to the given nested JSON field (array or object) to begin processing.");

Review Comment:
   ```suggestion
       NESTED_FIELD("Nested Field", "Skips forward to the given nested JSON field (array or object) to begin processing");
   
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java:
##########
@@ -59,32 +61,62 @@
         + "If an array is encountered, each element in that array will be treated as a separate record. "
         + "If the schema that is configured contains a field that is not present in the JSON, a null value will be used. If the JSON contains "
         + "a field that is not present in the schema, that field will be skipped. "
-    + "See the Usage of the Controller Service for more information and examples.")
+        + "See the Usage of the Controller Service for more information and examples.")
 @SeeAlso(JsonPathReader.class)
 public class JsonTreeReader extends SchemaRegistryService implements RecordReaderFactory {
 
     private volatile String dateFormat;
     private volatile String timeFormat;
     private volatile String timestampFormat;
+    private volatile String startingFieldName;
+    private volatile StartingFieldStrategy startingFieldStrategy;
+
+    public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new PropertyDescriptor.Builder()
+            .name("starting-field-strategy")
+            .displayName("Starting Field Strategy")
+            .description("Start processing from the root node or from a specified nested node.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue(StartingFieldStrategy.ROOT_NODE.name())
+            .allowableValues(
+                    Arrays.stream(StartingFieldStrategy.values()).map(startingStrategy ->
+                            new AllowableValue(startingStrategy.name(), startingStrategy.name(), startingStrategy.getDescription())
+                    ).toArray(AllowableValue[]::new))
+            .build();
+
+
+    public static final PropertyDescriptor STARTING_FIELD_NAME = new PropertyDescriptor.Builder()
+            .name("starting-field-name")
+            .displayName("Starting Field Name")
+            .description("Skips forward to the given nested JSON field (array or object) to begin processing.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue(null)
+            .dependsOn(STARTING_FIELD_STRATEGY, StartingFieldStrategy.NESTED_NODE.name())
+            .build();
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
         properties.add(new PropertyDescriptor.Builder()
-            .fromPropertyDescriptor(SCHEMA_CACHE)
-            .dependsOn(SCHEMA_ACCESS_STRATEGY, INFER_SCHEMA)
-            .build());
+                .fromPropertyDescriptor(SCHEMA_CACHE)
+                .dependsOn(SCHEMA_ACCESS_STRATEGY, INFER_SCHEMA)
+                .build());
         properties.add(DateTimeUtils.DATE_FORMAT);
         properties.add(DateTimeUtils.TIME_FORMAT);
         properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
+        properties.add(STARTING_FIELD_STRATEGY);
+        properties.add(STARTING_FIELD_NAME);

Review Comment:
   Recommend moving the required field before the optional date fields:
   ```suggestion
           properties.add(STARTING_FIELD_STRATEGY);
           properties.add(STARTING_FIELD_NAME);
           properties.add(DateTimeUtils.DATE_FORMAT);
           properties.add(DateTimeUtils.TIME_FORMAT);
           properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #5937:
URL: https://github.com/apache/nifi/pull/5937#discussion_r846257332


##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -95,6 +100,41 @@ public AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logg
         }
     }
 
+    protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat,
+                                          final String skipToNestedJsonField) throws IOException, MalformedRecordException {
+
+        this(logger, dateFormat, timeFormat, timestampFormat);
+
+        try {
+            jsonParser = jsonFactory.createParser(in);
+            jsonParser.setCodec(codec);
+
+            if (skipToNestedJsonField != null) {
+                while (!jsonParser.nextFieldName(new SerializedString(skipToNestedJsonField))) {
+                    // go to nested field if specified
+                    if (!jsonParser.hasCurrentToken()) {
+                        throw new IOException("The defined skipTo json field is not found when processing json as NiFi record.");
+                    }

Review Comment:
   It was necessary to break the loop when the skipToFieldName was not found. I changed it to:
   
   `while (!jsonParser.nextFieldName(serializedSkipToField) && jsonParser.hasCurrentToken()) {
    }`
   
   And the implementation will follow the original behaviour aka return null when the field is not found.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #5937:
URL: https://github.com/apache/nifi/pull/5937#discussion_r846134215


##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -61,11 +60,11 @@
 
     private static final JsonFactory jsonFactory = new JsonFactory();
     private static final ObjectMapper codec = new ObjectMapper();
+    private JsonParser jsonParser;
+    private JsonNode firstJsonNode;

Review Comment:
   I was trying to extract the common part from the two original constructor and avoiding method calling from constructor so I created a private constructor which does not initialize jsonParser and firstJsonNode. That's why they're not final. If I extract the initiation of these variable too, the common constrcutor will have the same signature of the original ones. Do you have any better idea?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #5937:
URL: https://github.com/apache/nifi/pull/5937#discussion_r846153153


##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java:
##########
@@ -19,37 +19,59 @@
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.SerializedString;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.schema.inference.RecordSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 
 public class JsonRecordSource implements RecordSource<JsonNode> {
+    private static final Logger logger = LoggerFactory.getLogger(JsonRecordSource.class);
     private static final JsonFactory jsonFactory;
     private final JsonParser jsonParser;
+    private final String skipToNestedJsonField;
 
     static {
         jsonFactory = new JsonFactory();
         jsonFactory.setCodec(new ObjectMapper());
     }
 
-    public JsonRecordSource(final InputStream in) throws IOException {
-        jsonParser = jsonFactory.createJsonParser(in);
+    public JsonRecordSource(final InputStream in, final String skipToNestedJsonField) throws IOException {
+        jsonParser = jsonFactory.createParser(in);
+        this.skipToNestedJsonField = skipToNestedJsonField;
+    }
+
+    @Override
+    public void init() throws IOException {

Review Comment:
   I created another constructor with skipToField and moved the logic there. I reverted the RecordSource interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #5937:
URL: https://github.com/apache/nifi/pull/5937#discussion_r846156587


##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java:
##########
@@ -41,32 +44,32 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class TestInferJsonSchemaAccessStrategy {
-    private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
-    private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
-    private final String timestampFormat = "yyyy-MM-DD'T'HH:mm:ss.SSS'Z'";
+class TestInferJsonSchemaAccessStrategy {

Review Comment:
   Since the JUnit5 upgrade public modifiers are not necessary. I'd leave it like this it it's okay.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #5937:
URL: https://github.com/apache/nifi/pull/5937#discussion_r847321536


##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java:
##########
@@ -19,37 +19,58 @@
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.SerializedString;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.schema.inference.RecordSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 
 public class JsonRecordSource implements RecordSource<JsonNode> {
+    private static final Logger logger = LoggerFactory.getLogger(JsonRecordSource.class);
     private static final JsonFactory jsonFactory;
     private final JsonParser jsonParser;
+    private final String skipToNestedJsonField;
 
     static {
         jsonFactory = new JsonFactory();
         jsonFactory.setCodec(new ObjectMapper());
     }
 
     public JsonRecordSource(final InputStream in) throws IOException {
-        jsonParser = jsonFactory.createJsonParser(in);
+        jsonParser = jsonFactory.createParser(in);
+        skipToNestedJsonField = null;
+    }
+
+    public JsonRecordSource(final InputStream in, final String nestedFieldName) throws IOException {
+        jsonParser = jsonFactory.createParser(in);
+        this.skipToNestedJsonField = nestedFieldName;
+
+        if (skipToNestedJsonField != null) {
+            final SerializedString serializedNestedField = new SerializedString(skipToNestedJsonField);
+            while (!jsonParser.nextFieldName(serializedNestedField) && jsonParser.hasCurrentToken()) {
+            }
+            logger.debug("Skipped to specified json field [{}] while inferring json schema.", skipToNestedJsonField);

Review Comment:
   Recommend adjusting this log statement to follow the same wording as the one in the Record Reader:
   ```suggestion
               logger.debug("Parsing starting at nested field [{}]", nestedFieldName);
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java:
##########
@@ -19,37 +19,58 @@
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.SerializedString;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.schema.inference.RecordSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 
 public class JsonRecordSource implements RecordSource<JsonNode> {
+    private static final Logger logger = LoggerFactory.getLogger(JsonRecordSource.class);
     private static final JsonFactory jsonFactory;
     private final JsonParser jsonParser;
+    private final String skipToNestedJsonField;

Review Comment:
   This field name should be adjusted to `nestedFieldName` to match the constructor argument.
   



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java:
##########
@@ -66,6 +67,16 @@
     private volatile String dateFormat;
     private volatile String timeFormat;
     private volatile String timestampFormat;
+    private volatile String skipToNestedJsonField;

Review Comment:
   Recommend renaming to `startingFieldName` to follow the same convention as the Property Descriptor.



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java:
##########
@@ -19,37 +19,58 @@
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.SerializedString;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.schema.inference.RecordSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 
 public class JsonRecordSource implements RecordSource<JsonNode> {
+    private static final Logger logger = LoggerFactory.getLogger(JsonRecordSource.class);
     private static final JsonFactory jsonFactory;
     private final JsonParser jsonParser;
+    private final String skipToNestedJsonField;
 
     static {
         jsonFactory = new JsonFactory();
         jsonFactory.setCodec(new ObjectMapper());
     }
 
     public JsonRecordSource(final InputStream in) throws IOException {
-        jsonParser = jsonFactory.createJsonParser(in);
+        jsonParser = jsonFactory.createParser(in);
+        skipToNestedJsonField = null;
+    }
+
+    public JsonRecordSource(final InputStream in, final String nestedFieldName) throws IOException {
+        jsonParser = jsonFactory.createParser(in);
+        this.skipToNestedJsonField = nestedFieldName;
+
+        if (skipToNestedJsonField != null) {
+            final SerializedString serializedNestedField = new SerializedString(skipToNestedJsonField);
+            while (!jsonParser.nextFieldName(serializedNestedField) && jsonParser.hasCurrentToken()) {
+            }
+            logger.debug("Skipped to specified json field [{}] while inferring json schema.", skipToNestedJsonField);
+        }
     }
 
     @Override
     public JsonNode next() throws IOException {
+        JsonToken token = jsonParser.nextToken();
+        if (skipToNestedJsonField != null && !jsonParser.isExpectedStartArrayToken() && token != JsonToken.START_OBJECT) {
+            logger.debug("Specified json field [{}] to skip to is not found. Schema infer will start from the next nested json object or array.", skipToNestedJsonField);
+        }
         while (true) {
-            final JsonToken token = jsonParser.nextToken();
             if (token == null) {
                 return null;
             }
-
             if (token == JsonToken.START_OBJECT) {
                 return jsonParser.readValueAsTree();
             }
+            token = jsonParser.nextToken();

Review Comment:
   Following removal of the check and debug statement, can the changes in this method be reverted?



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java:
##########
@@ -66,6 +67,16 @@
     private volatile String dateFormat;
     private volatile String timeFormat;
     private volatile String timestampFormat;
+    private volatile String skipToNestedJsonField;
+
+    public static final PropertyDescriptor STARTING_FIELD_NAME = new PropertyDescriptor.Builder()

Review Comment:
   On further review, it would be helpful to introduce one more strategy property on which this property should depend.
   
   The property name should be something like `Starting Node Strategy`. To maintain compatibility with the current default approach, the default value should be `Root Node`, indicating that record reading will begin at the root of the JSON document. The second value should be `Nested Node`, which would enable the configuration of this new `Starting Field Name` property. Based on this approach, these two allowable values would provide a basis for potential future options, and it should also make the options easier to understand for non-default configurations.



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java:
##########
@@ -19,37 +19,59 @@
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.SerializedString;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.schema.inference.RecordSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 
 public class JsonRecordSource implements RecordSource<JsonNode> {
+    private static final Logger logger = LoggerFactory.getLogger(JsonRecordSource.class);
     private static final JsonFactory jsonFactory;
     private final JsonParser jsonParser;
+    private final String skipToNestedJsonField;
 
     static {
         jsonFactory = new JsonFactory();
         jsonFactory.setCodec(new ObjectMapper());
     }
 
-    public JsonRecordSource(final InputStream in) throws IOException {
-        jsonParser = jsonFactory.createJsonParser(in);
+    public JsonRecordSource(final InputStream in, final String skipToNestedJsonField) throws IOException {
+        jsonParser = jsonFactory.createParser(in);
+        this.skipToNestedJsonField = skipToNestedJsonField;
+    }
+
+    @Override
+    public void init() throws IOException {
+        if (skipToNestedJsonField != null) {
+            while (!jsonParser.nextFieldName(new SerializedString(skipToNestedJsonField))) {
+                // go to nested field if specified
+                if (!jsonParser.hasCurrentToken()) {
+                    throw new IOException("The defined skipTo json field is not found when inferring json schema.");
+                }
+            }
+            logger.debug("Skipped to specified json field [{}] while inferring json schema.", skipToNestedJsonField);
+        }
     }
 
     @Override
     public JsonNode next() throws IOException {
+        JsonToken token = jsonParser.nextToken();
+        if (skipToNestedJsonField != null && !jsonParser.isExpectedStartArrayToken() && token != JsonToken.START_OBJECT) {
+            logger.debug("Specified json field [{}] to skip to is not found. Schema infer will start from the next nested json object or array.", skipToNestedJsonField);
+        }

Review Comment:
   Recommend removing this check and debug log as subsequent processing should handle any potential issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #5937:
URL: https://github.com/apache/nifi/pull/5937#discussion_r846134215


##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -61,11 +60,11 @@
 
     private static final JsonFactory jsonFactory = new JsonFactory();
     private static final ObjectMapper codec = new ObjectMapper();
+    private JsonParser jsonParser;
+    private JsonNode firstJsonNode;

Review Comment:
   I was trying to extract the common part from the two original constructor and avoiding method calling from constructor so I created a private constructor which does not initialize jsonParser and firstJsonNode. That's why they're not final. If I extract the initiation of these variable two, the common constrcutor will have the same signature of the original ones. Do you have any better idea?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #5937: NIFI-9862: Update JsonTreeReader to read Records from a Nested Array

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #5937:
URL: https://github.com/apache/nifi/pull/5937#discussion_r846136022


##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -95,6 +100,41 @@ public AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logg
         }
     }
 
+    protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat,
+                                          final String skipToNestedJsonField) throws IOException, MalformedRecordException {
+
+        this(logger, dateFormat, timeFormat, timestampFormat);
+
+        try {
+            jsonParser = jsonFactory.createParser(in);
+            jsonParser.setCodec(codec);
+
+            if (skipToNestedJsonField != null) {
+                while (!jsonParser.nextFieldName(new SerializedString(skipToNestedJsonField))) {

Review Comment:
   This was in my mind, thanks for the reminder!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org