You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/01/12 22:33:59 UTC

[GitHub] [nifi] sjyang18 opened a new pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

sjyang18 opened a new pull request #4754:
URL: https://github.com/apache/nifi/pull/4754


   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
   #### Description of PR
   
   Functionally it is equivalent to GetMongoRecord Processor in nifi-mongodb-bundle, but this processor will use the Azure Cosmos native sql API.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [X] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [X] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [X] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [X] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [X] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [X] Have you written or updated unit tests to verify your changes?
   - [X] Have you verified that the full build is successful on JDK 8?
   - [X] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [X] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] sjyang18 commented on pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
sjyang18 commented on pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#issuecomment-829490194


   @jfrazee Would you take a look at this PR for me?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] Lehel44 commented on a change in pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on a change in pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#discussion_r573018147



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/GetAzureCosmosDBRecord.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@Tags({ "azure", "cosmos", "record", "read", "fetch" })
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@CapabilityDescription("A record-oriented GET processor that uses the record writers to write the Azure Cosmos SQL select query result set.")
+public class GetAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
+    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-writer-factory")
+        .displayName("Record Writer")
+        .description("The record writer to use to write the result sets")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("schema-name")
+        .displayName("Schema Name")
+        .description("The name of the schema in the configured schema registry to use for the query results")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .defaultValue("${schema.name}")
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("sql-core-document-query")
+        .displayName("SQL Core Document Query")
+        .description("The SQL select query to execute. "
+                + "This should be a valid SQL select query to Cosmos DB with core sql api")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    public static final PropertyDescriptor MAX_RESPONSE_PAGE_SIZE = new PropertyDescriptor.Builder()
+        .name("max-page-size")
+        .displayName("Max Page Size")
+        .description("The maximum number of elements in a response page from Cosmos DB")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private ComponentLog logger;
+    private final static ObjectMapper mapper;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(QUERY);
+        _propertyDescriptors.add(WRITER_FACTORY);
+        _propertyDescriptors.add(SCHEMA_NAME);
+        _propertyDescriptors.add(MAX_RESPONSE_PAGE_SIZE);
+        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        _relationships.add(REL_ORIGINAL);
+        relationships = Collections.unmodifiableSet(_relationships);
+        mapper = new ObjectMapper();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        Collection<ValidationResult> result = super.customValidate(context);
+
+        boolean queryIsSet = context.getProperty(QUERY).isSet();
+        if (!queryIsSet) {
+            final String msg = QUERY.getDisplayName() + " must be set.";
+            result.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+        }
+        return result;
+    }
+
+    private String getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
+        String query = null;
+
+        if (context.getProperty(QUERY).isSet()) {
+            query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+        } else if (!context.getProperty(QUERY).isSet() && input == null) {
+            query = "select top 100 * from c";
+        }
+        return query;
+    }
+
+    private Map<String, String> getAttributes(ProcessContext context, FlowFile input) {
+        final Map<String, String> attributes = new HashMap<>();
+
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+
+        if (context.getProperty(QUERY).isSet()) {
+            final String query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+            attributes.put("query", query);
+        }
+        return attributes;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        this.writerFactory =context.getProperty(WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+
+        final FlowFile input = context.hasIncomingConnection() ? session.get() : null;
+        logger = getLogger();
+
+        if (input == null && context.hasNonLoopConnection()) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("No flowfile input and NonLoopConnection. Ending onTrigger... ");
+            }
+            return;
+        }
+        final String query = getQuery(context, session, input);
+        final Map<String, String> attributes = getAttributes(context, input);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Running Cosmos SQL query : " + query);
+        }
+        final CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
+        final CosmosContainer container = getContainer();
+        final CosmosPagedIterable<JsonNode> response =  container != null
+                ? container.queryItems(query, queryOptions, JsonNode.class)
+                : null;
+        if (response == null) {
+            logger.error("Fails to get CosmosPagedIterable<JsonNode> response");
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            return;
+        }
+        FlowFile output = input != null ? session.create(input) : session.create();
+        try {
+            logger.debug("Start to process data from Azure Cosmos DB");
+            final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(input).getValue();
+            try (OutputStream out = session.write(output)) {
+                Map<String, String> attrs = input != null ? input.getAttributes() : new HashMap<String, String>(){{
+                    put("schema.name", schemaName);
+                }};
+                RecordSchema schema = writerFactory.getSchema(attrs, null);
+                RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attrs);
+                final AtomicLong count = new AtomicLong();
+                writer.beginRecordSet();
+
+                response.forEach(data ->{
+                    try {
+                        Map<String,Object> mapObj = mapper.convertValue(data, new TypeReference<Map<String, Object>>(){});
+                        Record record = new MapRecord(schema, mapObj);
+                        writer.write(record);
+                    } catch(IOException | IllegalArgumentException ex) {
+                        throw new RuntimeException(ex);
+                    }
+                    count.incrementAndGet();
+                });
+                writer.finishRecordSet();
+                writer.close();
+                out.close();
+                attributes.put("record.count", String.valueOf(count.get()));
+            } catch (SchemaNotFoundException e) {
+                throw new RuntimeException(e);
+            }

Review comment:
       I'd recommend a little refactor here, to break down the complexity of this method. I'd extract the nested try block and the map initialization. The naming can be refined of course. 
   - Also I encourage using a custom, more specific exception than RuntimeException, e.g. CosmosException.
   - And to log the exception by logger when you catch it.
   
   ```
       private void processSession(ProcessSession session, FlowFile input, Map<String, String> attributes, CosmosPagedIterable<JsonNode> response, FlowFile output, String schemaName) throws IOException {
           final Map<String, String> attributeMap = initAttributeMap(input, schemaName);
           try (OutputStream out = session.write(output)) {
               RecordSchema schema = writerFactory.getSchema(attributeMap, null);
               RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attributeMap);
               final AtomicLong count = new AtomicLong();
               writer.beginRecordSet();
   
               response.forEach(data -> processData(schema, writer, count, data));
               writer.finishRecordSet();
               writer.close();
               attributes.put("record.count", String.valueOf(count.get()));
           } catch (SchemaNotFoundException e) {
               throw new RuntimeException(e);
           }
       }
       
       private void processData(RecordSchema schema, RecordSetWriter writer, AtomicLong count, JsonNode data) {
           try {
               Map<String,Object> mapObj = mapper.convertValue(data, new TypeReference<Map<String, Object>>(){});
               Record record = new MapRecord(schema, mapObj);
               writer.write(record);
           } catch(IOException | IllegalArgumentException ex) {
               throw new RuntimeException(ex);
           }
           count.incrementAndGet();
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] Lehel44 commented on a change in pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on a change in pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#discussion_r573018147



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/GetAzureCosmosDBRecord.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@Tags({ "azure", "cosmos", "record", "read", "fetch" })
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@CapabilityDescription("A record-oriented GET processor that uses the record writers to write the Azure Cosmos SQL select query result set.")
+public class GetAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
+    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-writer-factory")
+        .displayName("Record Writer")
+        .description("The record writer to use to write the result sets")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("schema-name")
+        .displayName("Schema Name")
+        .description("The name of the schema in the configured schema registry to use for the query results")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .defaultValue("${schema.name}")
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("sql-core-document-query")
+        .displayName("SQL Core Document Query")
+        .description("The SQL select query to execute. "
+                + "This should be a valid SQL select query to Cosmos DB with core sql api")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    public static final PropertyDescriptor MAX_RESPONSE_PAGE_SIZE = new PropertyDescriptor.Builder()
+        .name("max-page-size")
+        .displayName("Max Page Size")
+        .description("The maximum number of elements in a response page from Cosmos DB")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private ComponentLog logger;
+    private final static ObjectMapper mapper;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(QUERY);
+        _propertyDescriptors.add(WRITER_FACTORY);
+        _propertyDescriptors.add(SCHEMA_NAME);
+        _propertyDescriptors.add(MAX_RESPONSE_PAGE_SIZE);
+        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        _relationships.add(REL_ORIGINAL);
+        relationships = Collections.unmodifiableSet(_relationships);
+        mapper = new ObjectMapper();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        Collection<ValidationResult> result = super.customValidate(context);
+
+        boolean queryIsSet = context.getProperty(QUERY).isSet();
+        if (!queryIsSet) {
+            final String msg = QUERY.getDisplayName() + " must be set.";
+            result.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+        }
+        return result;
+    }
+
+    private String getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
+        String query = null;
+
+        if (context.getProperty(QUERY).isSet()) {
+            query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+        } else if (!context.getProperty(QUERY).isSet() && input == null) {
+            query = "select top 100 * from c";
+        }
+        return query;
+    }
+
+    private Map<String, String> getAttributes(ProcessContext context, FlowFile input) {
+        final Map<String, String> attributes = new HashMap<>();
+
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+
+        if (context.getProperty(QUERY).isSet()) {
+            final String query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+            attributes.put("query", query);
+        }
+        return attributes;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        this.writerFactory =context.getProperty(WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+
+        final FlowFile input = context.hasIncomingConnection() ? session.get() : null;
+        logger = getLogger();
+
+        if (input == null && context.hasNonLoopConnection()) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("No flowfile input and NonLoopConnection. Ending onTrigger... ");
+            }
+            return;
+        }
+        final String query = getQuery(context, session, input);
+        final Map<String, String> attributes = getAttributes(context, input);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Running Cosmos SQL query : " + query);
+        }
+        final CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
+        final CosmosContainer container = getContainer();
+        final CosmosPagedIterable<JsonNode> response =  container != null
+                ? container.queryItems(query, queryOptions, JsonNode.class)
+                : null;
+        if (response == null) {
+            logger.error("Fails to get CosmosPagedIterable<JsonNode> response");
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            return;
+        }
+        FlowFile output = input != null ? session.create(input) : session.create();
+        try {
+            logger.debug("Start to process data from Azure Cosmos DB");
+            final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(input).getValue();
+            try (OutputStream out = session.write(output)) {
+                Map<String, String> attrs = input != null ? input.getAttributes() : new HashMap<String, String>(){{
+                    put("schema.name", schemaName);
+                }};
+                RecordSchema schema = writerFactory.getSchema(attrs, null);
+                RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attrs);
+                final AtomicLong count = new AtomicLong();
+                writer.beginRecordSet();
+
+                response.forEach(data ->{
+                    try {
+                        Map<String,Object> mapObj = mapper.convertValue(data, new TypeReference<Map<String, Object>>(){});
+                        Record record = new MapRecord(schema, mapObj);
+                        writer.write(record);
+                    } catch(IOException | IllegalArgumentException ex) {
+                        throw new RuntimeException(ex);
+                    }
+                    count.incrementAndGet();
+                });
+                writer.finishRecordSet();
+                writer.close();
+                out.close();
+                attributes.put("record.count", String.valueOf(count.get()));
+            } catch (SchemaNotFoundException e) {
+                throw new RuntimeException(e);
+            }

Review comment:
       I'd recommend a little refactor here, to break down the complexity of this method. I'd extract the nested try block and the map initialization.
   
   ```suggestion
       private void processSession(ProcessSession session, FlowFile input, Map<String, String> attributes, CosmosPagedIterable<JsonNode> response, FlowFile output, String schemaName) throws IOException {
           final Map<String, String> attributeMap = initAttributeMap(input, schemaName);
           try (OutputStream out = session.write(output)) {
               RecordSchema schema = writerFactory.getSchema(attributeMap, null);
               RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attributeMap);
               final AtomicLong count = new AtomicLong();
               writer.beginRecordSet();
   
               response.forEach(data -> processData(schema, writer, count, data));
               writer.finishRecordSet();
               writer.close();
               attributes.put("record.count", String.valueOf(count.get()));
           } catch (SchemaNotFoundException e) {
               throw new RuntimeException(e);
           }
       }
       
       private void processData(RecordSchema schema, RecordSetWriter writer, AtomicLong count, JsonNode data) {
           try {
               Map<String,Object> mapObj = mapper.convertValue(data, new TypeReference<Map<String, Object>>(){});
               Record record = new MapRecord(schema, mapObj);
               writer.write(record);
           } catch(IOException | IllegalArgumentException ex) {
               throw new RuntimeException(ex);
           }
           count.incrementAndGet();
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pvillard31 commented on pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#issuecomment-974800680


   @sjyang18 @jfrazee I think this is an interesting additional feature, are you still looking into this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] Lehel44 commented on a change in pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on a change in pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#discussion_r574038766



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/GetAzureCosmosDBRecord.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@Tags({ "azure", "cosmos", "record", "read", "fetch" })
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@CapabilityDescription("A record-oriented GET processor that uses the record writers to write the Azure Cosmos SQL select query result set.")
+public class GetAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
+    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-writer-factory")
+        .displayName("Record Writer")
+        .description("The record writer to use to write the result sets")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("schema-name")
+        .displayName("Schema Name")
+        .description("The name of the schema in the configured schema registry to use for the query results")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .defaultValue("${schema.name}")
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("sql-core-document-query")
+        .displayName("SQL Core Document Query")
+        .description("The SQL select query to execute. "
+                + "This should be a valid SQL select query to Cosmos DB with core sql api")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    public static final PropertyDescriptor MAX_RESPONSE_PAGE_SIZE = new PropertyDescriptor.Builder()
+        .name("max-page-size")
+        .displayName("Max Page Size")
+        .description("The maximum number of elements in a response page from Cosmos DB")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private ComponentLog logger;
+    private final static ObjectMapper mapper;

Review comment:
       Yes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] Lehel44 commented on a change in pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on a change in pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#discussion_r573018147



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/GetAzureCosmosDBRecord.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@Tags({ "azure", "cosmos", "record", "read", "fetch" })
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@CapabilityDescription("A record-oriented GET processor that uses the record writers to write the Azure Cosmos SQL select query result set.")
+public class GetAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
+    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-writer-factory")
+        .displayName("Record Writer")
+        .description("The record writer to use to write the result sets")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("schema-name")
+        .displayName("Schema Name")
+        .description("The name of the schema in the configured schema registry to use for the query results")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .defaultValue("${schema.name}")
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("sql-core-document-query")
+        .displayName("SQL Core Document Query")
+        .description("The SQL select query to execute. "
+                + "This should be a valid SQL select query to Cosmos DB with core sql api")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    public static final PropertyDescriptor MAX_RESPONSE_PAGE_SIZE = new PropertyDescriptor.Builder()
+        .name("max-page-size")
+        .displayName("Max Page Size")
+        .description("The maximum number of elements in a response page from Cosmos DB")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private ComponentLog logger;
+    private final static ObjectMapper mapper;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(QUERY);
+        _propertyDescriptors.add(WRITER_FACTORY);
+        _propertyDescriptors.add(SCHEMA_NAME);
+        _propertyDescriptors.add(MAX_RESPONSE_PAGE_SIZE);
+        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        _relationships.add(REL_ORIGINAL);
+        relationships = Collections.unmodifiableSet(_relationships);
+        mapper = new ObjectMapper();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        Collection<ValidationResult> result = super.customValidate(context);
+
+        boolean queryIsSet = context.getProperty(QUERY).isSet();
+        if (!queryIsSet) {
+            final String msg = QUERY.getDisplayName() + " must be set.";
+            result.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+        }
+        return result;
+    }
+
+    private String getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
+        String query = null;
+
+        if (context.getProperty(QUERY).isSet()) {
+            query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+        } else if (!context.getProperty(QUERY).isSet() && input == null) {
+            query = "select top 100 * from c";
+        }
+        return query;
+    }
+
+    private Map<String, String> getAttributes(ProcessContext context, FlowFile input) {
+        final Map<String, String> attributes = new HashMap<>();
+
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+
+        if (context.getProperty(QUERY).isSet()) {
+            final String query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+            attributes.put("query", query);
+        }
+        return attributes;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        this.writerFactory =context.getProperty(WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+
+        final FlowFile input = context.hasIncomingConnection() ? session.get() : null;
+        logger = getLogger();
+
+        if (input == null && context.hasNonLoopConnection()) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("No flowfile input and NonLoopConnection. Ending onTrigger... ");
+            }
+            return;
+        }
+        final String query = getQuery(context, session, input);
+        final Map<String, String> attributes = getAttributes(context, input);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Running Cosmos SQL query : " + query);
+        }
+        final CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
+        final CosmosContainer container = getContainer();
+        final CosmosPagedIterable<JsonNode> response =  container != null
+                ? container.queryItems(query, queryOptions, JsonNode.class)
+                : null;
+        if (response == null) {
+            logger.error("Fails to get CosmosPagedIterable<JsonNode> response");
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            return;
+        }
+        FlowFile output = input != null ? session.create(input) : session.create();
+        try {
+            logger.debug("Start to process data from Azure Cosmos DB");
+            final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(input).getValue();
+            try (OutputStream out = session.write(output)) {
+                Map<String, String> attrs = input != null ? input.getAttributes() : new HashMap<String, String>(){{
+                    put("schema.name", schemaName);
+                }};
+                RecordSchema schema = writerFactory.getSchema(attrs, null);
+                RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attrs);
+                final AtomicLong count = new AtomicLong();
+                writer.beginRecordSet();
+
+                response.forEach(data ->{
+                    try {
+                        Map<String,Object> mapObj = mapper.convertValue(data, new TypeReference<Map<String, Object>>(){});
+                        Record record = new MapRecord(schema, mapObj);
+                        writer.write(record);
+                    } catch(IOException | IllegalArgumentException ex) {
+                        throw new RuntimeException(ex);
+                    }
+                    count.incrementAndGet();
+                });
+                writer.finishRecordSet();
+                writer.close();
+                out.close();
+                attributes.put("record.count", String.valueOf(count.get()));
+            } catch (SchemaNotFoundException e) {
+                throw new RuntimeException(e);
+            }

Review comment:
       I'd recommend a little refactor here, to break down the complexity of this method. I'd extract the nested try block and the map initialization. The naming can be refined of course. Also I encourage using come custom more specific exception than RuntimeException.
   
   ```
       private void processSession(ProcessSession session, FlowFile input, Map<String, String> attributes, CosmosPagedIterable<JsonNode> response, FlowFile output, String schemaName) throws IOException {
           final Map<String, String> attributeMap = initAttributeMap(input, schemaName);
           try (OutputStream out = session.write(output)) {
               RecordSchema schema = writerFactory.getSchema(attributeMap, null);
               RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attributeMap);
               final AtomicLong count = new AtomicLong();
               writer.beginRecordSet();
   
               response.forEach(data -> processData(schema, writer, count, data));
               writer.finishRecordSet();
               writer.close();
               attributes.put("record.count", String.valueOf(count.get()));
           } catch (SchemaNotFoundException e) {
               throw new RuntimeException(e);
           }
       }
       
       private void processData(RecordSchema schema, RecordSetWriter writer, AtomicLong count, JsonNode data) {
           try {
               Map<String,Object> mapObj = mapper.convertValue(data, new TypeReference<Map<String, Object>>(){});
               Record record = new MapRecord(schema, mapObj);
               writer.write(record);
           } catch(IOException | IllegalArgumentException ex) {
               throw new RuntimeException(ex);
           }
           count.incrementAndGet();
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] jfrazee commented on a change in pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
jfrazee commented on a change in pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#discussion_r573998184



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/GetAzureCosmosDBRecord.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@Tags({ "azure", "cosmos", "record", "read", "fetch" })
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@CapabilityDescription("A record-oriented GET processor that uses the record writers to write the Azure Cosmos SQL select query result set.")
+public class GetAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
+    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-writer-factory")
+        .displayName("Record Writer")
+        .description("The record writer to use to write the result sets")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("schema-name")
+        .displayName("Schema Name")
+        .description("The name of the schema in the configured schema registry to use for the query results")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .defaultValue("${schema.name}")
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("sql-core-document-query")
+        .displayName("SQL Core Document Query")
+        .description("The SQL select query to execute. "
+                + "This should be a valid SQL select query to Cosmos DB with core sql api")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    public static final PropertyDescriptor MAX_RESPONSE_PAGE_SIZE = new PropertyDescriptor.Builder()
+        .name("max-page-size")
+        .displayName("Max Page Size")
+        .description("The maximum number of elements in a response page from Cosmos DB")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private ComponentLog logger;
+    private final static ObjectMapper mapper;

Review comment:
       You mean private right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] Lehel44 commented on a change in pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on a change in pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#discussion_r573018147



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/GetAzureCosmosDBRecord.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@Tags({ "azure", "cosmos", "record", "read", "fetch" })
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@CapabilityDescription("A record-oriented GET processor that uses the record writers to write the Azure Cosmos SQL select query result set.")
+public class GetAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
+    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-writer-factory")
+        .displayName("Record Writer")
+        .description("The record writer to use to write the result sets")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("schema-name")
+        .displayName("Schema Name")
+        .description("The name of the schema in the configured schema registry to use for the query results")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .defaultValue("${schema.name}")
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("sql-core-document-query")
+        .displayName("SQL Core Document Query")
+        .description("The SQL select query to execute. "
+                + "This should be a valid SQL select query to Cosmos DB with core sql api")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    public static final PropertyDescriptor MAX_RESPONSE_PAGE_SIZE = new PropertyDescriptor.Builder()
+        .name("max-page-size")
+        .displayName("Max Page Size")
+        .description("The maximum number of elements in a response page from Cosmos DB")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private ComponentLog logger;
+    private final static ObjectMapper mapper;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(QUERY);
+        _propertyDescriptors.add(WRITER_FACTORY);
+        _propertyDescriptors.add(SCHEMA_NAME);
+        _propertyDescriptors.add(MAX_RESPONSE_PAGE_SIZE);
+        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        _relationships.add(REL_ORIGINAL);
+        relationships = Collections.unmodifiableSet(_relationships);
+        mapper = new ObjectMapper();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        Collection<ValidationResult> result = super.customValidate(context);
+
+        boolean queryIsSet = context.getProperty(QUERY).isSet();
+        if (!queryIsSet) {
+            final String msg = QUERY.getDisplayName() + " must be set.";
+            result.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+        }
+        return result;
+    }
+
+    private String getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
+        String query = null;
+
+        if (context.getProperty(QUERY).isSet()) {
+            query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+        } else if (!context.getProperty(QUERY).isSet() && input == null) {
+            query = "select top 100 * from c";
+        }
+        return query;
+    }
+
+    private Map<String, String> getAttributes(ProcessContext context, FlowFile input) {
+        final Map<String, String> attributes = new HashMap<>();
+
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+
+        if (context.getProperty(QUERY).isSet()) {
+            final String query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+            attributes.put("query", query);
+        }
+        return attributes;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        this.writerFactory =context.getProperty(WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+
+        final FlowFile input = context.hasIncomingConnection() ? session.get() : null;
+        logger = getLogger();
+
+        if (input == null && context.hasNonLoopConnection()) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("No flowfile input and NonLoopConnection. Ending onTrigger... ");
+            }
+            return;
+        }
+        final String query = getQuery(context, session, input);
+        final Map<String, String> attributes = getAttributes(context, input);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Running Cosmos SQL query : " + query);
+        }
+        final CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
+        final CosmosContainer container = getContainer();
+        final CosmosPagedIterable<JsonNode> response =  container != null
+                ? container.queryItems(query, queryOptions, JsonNode.class)
+                : null;
+        if (response == null) {
+            logger.error("Fails to get CosmosPagedIterable<JsonNode> response");
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            return;
+        }
+        FlowFile output = input != null ? session.create(input) : session.create();
+        try {
+            logger.debug("Start to process data from Azure Cosmos DB");
+            final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(input).getValue();
+            try (OutputStream out = session.write(output)) {
+                Map<String, String> attrs = input != null ? input.getAttributes() : new HashMap<String, String>(){{
+                    put("schema.name", schemaName);
+                }};
+                RecordSchema schema = writerFactory.getSchema(attrs, null);
+                RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attrs);
+                final AtomicLong count = new AtomicLong();
+                writer.beginRecordSet();
+
+                response.forEach(data ->{
+                    try {
+                        Map<String,Object> mapObj = mapper.convertValue(data, new TypeReference<Map<String, Object>>(){});
+                        Record record = new MapRecord(schema, mapObj);
+                        writer.write(record);
+                    } catch(IOException | IllegalArgumentException ex) {
+                        throw new RuntimeException(ex);
+                    }
+                    count.incrementAndGet();
+                });
+                writer.finishRecordSet();
+                writer.close();
+                out.close();
+                attributes.put("record.count", String.valueOf(count.get()));
+            } catch (SchemaNotFoundException e) {
+                throw new RuntimeException(e);
+            }

Review comment:
       I'd recommend a little refactor here, to break down the complexity of this method. I'd extract the nested try block and the map initialization.
   
   ```
       private void processSession(ProcessSession session, FlowFile input, Map<String, String> attributes, CosmosPagedIterable<JsonNode> response, FlowFile output, String schemaName) throws IOException {
           final Map<String, String> attributeMap = initAttributeMap(input, schemaName);
           try (OutputStream out = session.write(output)) {
               RecordSchema schema = writerFactory.getSchema(attributeMap, null);
               RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attributeMap);
               final AtomicLong count = new AtomicLong();
               writer.beginRecordSet();
   
               response.forEach(data -> processData(schema, writer, count, data));
               writer.finishRecordSet();
               writer.close();
               attributes.put("record.count", String.valueOf(count.get()));
           } catch (SchemaNotFoundException e) {
               throw new RuntimeException(e);
           }
       }
       
       private void processData(RecordSchema schema, RecordSetWriter writer, AtomicLong count, JsonNode data) {
           try {
               Map<String,Object> mapObj = mapper.convertValue(data, new TypeReference<Map<String, Object>>(){});
               Record record = new MapRecord(schema, mapObj);
               writer.write(record);
           } catch(IOException | IllegalArgumentException ex) {
               throw new RuntimeException(ex);
           }
           count.incrementAndGet();
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] Lehel44 commented on a change in pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on a change in pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#discussion_r575865089



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/GetAzureCosmosDBRecord.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@Tags({ "azure", "cosmos", "record", "read", "fetch" })
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@CapabilityDescription("A record-oriented GET processor that uses the record writers to write the Azure Cosmos SQL select query result set.")
+public class GetAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
+    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-writer-factory")
+        .displayName("Record Writer")
+        .description("The record writer to use to write the result sets")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("schema-name")
+        .displayName("Schema Name")
+        .description("The name of the schema in the configured schema registry to use for the query results")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .defaultValue("${schema.name}")
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("sql-core-document-query")
+        .displayName("SQL Core Document Query")
+        .description("The SQL select query to execute. "
+                + "This should be a valid SQL select query to Cosmos DB with core sql api")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    public static final PropertyDescriptor MAX_RESPONSE_PAGE_SIZE = new PropertyDescriptor.Builder()
+        .name("max-page-size")
+        .displayName("Max Page Size")
+        .description("The maximum number of elements in a response page from Cosmos DB")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private ComponentLog logger;
+    private final static ObjectMapper mapper;

Review comment:
       Thank you for the improvements!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pvillard31 commented on pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#issuecomment-974800680


   @sjyang18 @jfrazee I think this is an interesting additional feature, are you still looking into this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] Lehel44 commented on a change in pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on a change in pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#discussion_r572987431



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/GetAzureCosmosDBRecord.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@Tags({ "azure", "cosmos", "record", "read", "fetch" })
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@CapabilityDescription("A record-oriented GET processor that uses the record writers to write the Azure Cosmos SQL select query result set.")
+public class GetAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
+    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-writer-factory")
+        .displayName("Record Writer")
+        .description("The record writer to use to write the result sets")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("schema-name")
+        .displayName("Schema Name")
+        .description("The name of the schema in the configured schema registry to use for the query results")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .defaultValue("${schema.name}")
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("sql-core-document-query")
+        .displayName("SQL Core Document Query")
+        .description("The SQL select query to execute. "
+                + "This should be a valid SQL select query to Cosmos DB with core sql api")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    public static final PropertyDescriptor MAX_RESPONSE_PAGE_SIZE = new PropertyDescriptor.Builder()
+        .name("max-page-size")
+        .displayName("Max Page Size")
+        .description("The maximum number of elements in a response page from Cosmos DB")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private ComponentLog logger;
+    private final static ObjectMapper mapper;

Review comment:
       Hi, would you please reorder the qualifiers to "public static final" to comply with JSL :) ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] github-actions[bot] commented on pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#issuecomment-974730076


   We're marking this PR as stale due to lack of updates in the past few months. If after another couple of weeks the stale label has not been removed this PR will be closed. This stale marker and eventual auto close does not indicate a judgement of the PR just lack of reviewer bandwidth and helps us keep the PR queue more manageable.  If you would like this PR re-opened you can do so and a committer can remove the stale tag.  Or you can open a new PR.  Try to help review other PRs to increase PR review bandwidth which in turn helps yours.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] Lehel44 commented on a change in pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on a change in pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#discussion_r573018147



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/GetAzureCosmosDBRecord.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@Tags({ "azure", "cosmos", "record", "read", "fetch" })
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@CapabilityDescription("A record-oriented GET processor that uses the record writers to write the Azure Cosmos SQL select query result set.")
+public class GetAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
+    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-writer-factory")
+        .displayName("Record Writer")
+        .description("The record writer to use to write the result sets")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("schema-name")
+        .displayName("Schema Name")
+        .description("The name of the schema in the configured schema registry to use for the query results")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .defaultValue("${schema.name}")
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("sql-core-document-query")
+        .displayName("SQL Core Document Query")
+        .description("The SQL select query to execute. "
+                + "This should be a valid SQL select query to Cosmos DB with core sql api")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    public static final PropertyDescriptor MAX_RESPONSE_PAGE_SIZE = new PropertyDescriptor.Builder()
+        .name("max-page-size")
+        .displayName("Max Page Size")
+        .description("The maximum number of elements in a response page from Cosmos DB")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private ComponentLog logger;
+    private final static ObjectMapper mapper;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(QUERY);
+        _propertyDescriptors.add(WRITER_FACTORY);
+        _propertyDescriptors.add(SCHEMA_NAME);
+        _propertyDescriptors.add(MAX_RESPONSE_PAGE_SIZE);
+        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        _relationships.add(REL_ORIGINAL);
+        relationships = Collections.unmodifiableSet(_relationships);
+        mapper = new ObjectMapper();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        Collection<ValidationResult> result = super.customValidate(context);
+
+        boolean queryIsSet = context.getProperty(QUERY).isSet();
+        if (!queryIsSet) {
+            final String msg = QUERY.getDisplayName() + " must be set.";
+            result.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+        }
+        return result;
+    }
+
+    private String getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
+        String query = null;
+
+        if (context.getProperty(QUERY).isSet()) {
+            query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+        } else if (!context.getProperty(QUERY).isSet() && input == null) {
+            query = "select top 100 * from c";
+        }
+        return query;
+    }
+
+    private Map<String, String> getAttributes(ProcessContext context, FlowFile input) {
+        final Map<String, String> attributes = new HashMap<>();
+
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+
+        if (context.getProperty(QUERY).isSet()) {
+            final String query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+            attributes.put("query", query);
+        }
+        return attributes;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        this.writerFactory =context.getProperty(WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+
+        final FlowFile input = context.hasIncomingConnection() ? session.get() : null;
+        logger = getLogger();
+
+        if (input == null && context.hasNonLoopConnection()) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("No flowfile input and NonLoopConnection. Ending onTrigger... ");
+            }
+            return;
+        }
+        final String query = getQuery(context, session, input);
+        final Map<String, String> attributes = getAttributes(context, input);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Running Cosmos SQL query : " + query);
+        }
+        final CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
+        final CosmosContainer container = getContainer();
+        final CosmosPagedIterable<JsonNode> response =  container != null
+                ? container.queryItems(query, queryOptions, JsonNode.class)
+                : null;
+        if (response == null) {
+            logger.error("Fails to get CosmosPagedIterable<JsonNode> response");
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            return;
+        }
+        FlowFile output = input != null ? session.create(input) : session.create();
+        try {
+            logger.debug("Start to process data from Azure Cosmos DB");
+            final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(input).getValue();
+            try (OutputStream out = session.write(output)) {
+                Map<String, String> attrs = input != null ? input.getAttributes() : new HashMap<String, String>(){{
+                    put("schema.name", schemaName);
+                }};
+                RecordSchema schema = writerFactory.getSchema(attrs, null);
+                RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attrs);
+                final AtomicLong count = new AtomicLong();
+                writer.beginRecordSet();
+
+                response.forEach(data ->{
+                    try {
+                        Map<String,Object> mapObj = mapper.convertValue(data, new TypeReference<Map<String, Object>>(){});
+                        Record record = new MapRecord(schema, mapObj);
+                        writer.write(record);
+                    } catch(IOException | IllegalArgumentException ex) {
+                        throw new RuntimeException(ex);
+                    }
+                    count.incrementAndGet();
+                });
+                writer.finishRecordSet();
+                writer.close();
+                out.close();
+                attributes.put("record.count", String.valueOf(count.get()));
+            } catch (SchemaNotFoundException e) {
+                throw new RuntimeException(e);
+            }

Review comment:
       I'd recommend a little refactor here, to break down the complexity of this method. I'd extract the nested try block and the map initialization. The naming can be refined of course. 
   - Also I encourage using a custom, more specific exception than RuntimeException, e.g. CosmosException.
   - And to log the exception by logger when you catch it.
   
   ```
       private void processSession(ProcessSession session, FlowFile input, Map<String, String> attributes, CosmosPagedIterable<JsonNode> response, FlowFile output, String schemaName) throws IOException {
           final Map<String, String> attributeMap = initAttributeMap(input, schemaName);
           try (OutputStream out = session.write(output)) {
               RecordSchema schema = writerFactory.getSchema(attributeMap, null);
               RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attributeMap);
               final AtomicLong count = new AtomicLong();
               writer.beginRecordSet();
   
               response.forEach(data -> processData(schema, writer, count, data));
               writer.finishRecordSet();
               writer.close();
               attributes.put("record.count", String.valueOf(count.get()));
           } catch (SchemaNotFoundException e) {
               throw new RuntimeException(e);
           }
       }
       
       private void processData(RecordSchema schema, RecordSetWriter writer, AtomicLong count, JsonNode data) {
           try {
               Map<String,Object> mapObj = mapper.convertValue(data, new TypeReference<Map<String, Object>>(){});
               Record record = new MapRecord(schema, mapObj);
               writer.write(record);
           } catch(IOException | IllegalArgumentException ex) {
               throw new RuntimeException(ex);
           }
           count.incrementAndGet();
       }
   
       private Map<String, String> getAttributeMap(String schemaName, FlowFile input) {
           Map<String, String> attrs = new HashMap<>();
           if (input != null) {
               attrs = input.getAttributes();
           } else {
               attrs.put("schema.name", schemaName);
           }
           return attrs;
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] github-actions[bot] commented on pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#issuecomment-1074546504


   We're marking this PR as stale due to lack of updates in the past few months. If after another couple of weeks the stale label has not been removed this PR will be closed. This stale marker and eventual auto close does not indicate a judgement of the PR just lack of reviewer bandwidth and helps us keep the PR queue more manageable.  If you would like this PR re-opened you can do so and a committer can remove the stale tag.  Or you can open a new PR.  Try to help review other PRs to increase PR review bandwidth which in turn helps yours.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] Lehel44 commented on a change in pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on a change in pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#discussion_r572999225



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/GetAzureCosmosDBRecord.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@Tags({ "azure", "cosmos", "record", "read", "fetch" })
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@CapabilityDescription("A record-oriented GET processor that uses the record writers to write the Azure Cosmos SQL select query result set.")
+public class GetAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
+    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-writer-factory")
+        .displayName("Record Writer")
+        .description("The record writer to use to write the result sets")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("schema-name")
+        .displayName("Schema Name")
+        .description("The name of the schema in the configured schema registry to use for the query results")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .defaultValue("${schema.name}")
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("sql-core-document-query")
+        .displayName("SQL Core Document Query")
+        .description("The SQL select query to execute. "
+                + "This should be a valid SQL select query to Cosmos DB with core sql api")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    public static final PropertyDescriptor MAX_RESPONSE_PAGE_SIZE = new PropertyDescriptor.Builder()
+        .name("max-page-size")
+        .displayName("Max Page Size")
+        .description("The maximum number of elements in a response page from Cosmos DB")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private ComponentLog logger;
+    private final static ObjectMapper mapper;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(QUERY);
+        _propertyDescriptors.add(WRITER_FACTORY);
+        _propertyDescriptors.add(SCHEMA_NAME);
+        _propertyDescriptors.add(MAX_RESPONSE_PAGE_SIZE);
+        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        _relationships.add(REL_ORIGINAL);
+        relationships = Collections.unmodifiableSet(_relationships);
+        mapper = new ObjectMapper();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        Collection<ValidationResult> result = super.customValidate(context);
+
+        boolean queryIsSet = context.getProperty(QUERY).isSet();
+        if (!queryIsSet) {
+            final String msg = QUERY.getDisplayName() + " must be set.";
+            result.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+        }
+        return result;
+    }
+
+    private String getQuery(ProcessContext context, ProcessSession session, FlowFile input) {

Review comment:
       session is never used, would you please remove it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] Lehel44 commented on a change in pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on a change in pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#discussion_r573018147



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/GetAzureCosmosDBRecord.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@Tags({ "azure", "cosmos", "record", "read", "fetch" })
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@CapabilityDescription("A record-oriented GET processor that uses the record writers to write the Azure Cosmos SQL select query result set.")
+public class GetAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
+    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-writer-factory")
+        .displayName("Record Writer")
+        .description("The record writer to use to write the result sets")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("schema-name")
+        .displayName("Schema Name")
+        .description("The name of the schema in the configured schema registry to use for the query results")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .defaultValue("${schema.name}")
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("sql-core-document-query")
+        .displayName("SQL Core Document Query")
+        .description("The SQL select query to execute. "
+                + "This should be a valid SQL select query to Cosmos DB with core sql api")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    public static final PropertyDescriptor MAX_RESPONSE_PAGE_SIZE = new PropertyDescriptor.Builder()
+        .name("max-page-size")
+        .displayName("Max Page Size")
+        .description("The maximum number of elements in a response page from Cosmos DB")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private ComponentLog logger;
+    private final static ObjectMapper mapper;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(QUERY);
+        _propertyDescriptors.add(WRITER_FACTORY);
+        _propertyDescriptors.add(SCHEMA_NAME);
+        _propertyDescriptors.add(MAX_RESPONSE_PAGE_SIZE);
+        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        _relationships.add(REL_ORIGINAL);
+        relationships = Collections.unmodifiableSet(_relationships);
+        mapper = new ObjectMapper();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        Collection<ValidationResult> result = super.customValidate(context);
+
+        boolean queryIsSet = context.getProperty(QUERY).isSet();
+        if (!queryIsSet) {
+            final String msg = QUERY.getDisplayName() + " must be set.";
+            result.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+        }
+        return result;
+    }
+
+    private String getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
+        String query = null;
+
+        if (context.getProperty(QUERY).isSet()) {
+            query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+        } else if (!context.getProperty(QUERY).isSet() && input == null) {
+            query = "select top 100 * from c";
+        }
+        return query;
+    }
+
+    private Map<String, String> getAttributes(ProcessContext context, FlowFile input) {
+        final Map<String, String> attributes = new HashMap<>();
+
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+
+        if (context.getProperty(QUERY).isSet()) {
+            final String query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+            attributes.put("query", query);
+        }
+        return attributes;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        this.writerFactory =context.getProperty(WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+
+        final FlowFile input = context.hasIncomingConnection() ? session.get() : null;
+        logger = getLogger();
+
+        if (input == null && context.hasNonLoopConnection()) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("No flowfile input and NonLoopConnection. Ending onTrigger... ");
+            }
+            return;
+        }
+        final String query = getQuery(context, session, input);
+        final Map<String, String> attributes = getAttributes(context, input);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Running Cosmos SQL query : " + query);
+        }
+        final CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
+        final CosmosContainer container = getContainer();
+        final CosmosPagedIterable<JsonNode> response =  container != null
+                ? container.queryItems(query, queryOptions, JsonNode.class)
+                : null;
+        if (response == null) {
+            logger.error("Fails to get CosmosPagedIterable<JsonNode> response");
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            return;
+        }
+        FlowFile output = input != null ? session.create(input) : session.create();
+        try {
+            logger.debug("Start to process data from Azure Cosmos DB");
+            final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(input).getValue();
+            try (OutputStream out = session.write(output)) {
+                Map<String, String> attrs = input != null ? input.getAttributes() : new HashMap<String, String>(){{
+                    put("schema.name", schemaName);
+                }};
+                RecordSchema schema = writerFactory.getSchema(attrs, null);
+                RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attrs);
+                final AtomicLong count = new AtomicLong();
+                writer.beginRecordSet();
+
+                response.forEach(data ->{
+                    try {
+                        Map<String,Object> mapObj = mapper.convertValue(data, new TypeReference<Map<String, Object>>(){});
+                        Record record = new MapRecord(schema, mapObj);
+                        writer.write(record);
+                    } catch(IOException | IllegalArgumentException ex) {
+                        throw new RuntimeException(ex);
+                    }
+                    count.incrementAndGet();
+                });
+                writer.finishRecordSet();
+                writer.close();
+                out.close();
+                attributes.put("record.count", String.valueOf(count.get()));
+            } catch (SchemaNotFoundException e) {
+                throw new RuntimeException(e);
+            }

Review comment:
       I'd recommend a little refactor here, to break down the complexity of this method. I'd extract the nested try block and the map initialization. The naming can be refined of course. 
   - Also I encourage using a custom, more specific exception than RuntimeException, e.g. CosmosException.
   - And to log the exception by logger when you catch it.
   
   ```
       private void processSession(ProcessSession session, FlowFile input, Map<String, String> attributes, CosmosPagedIterable<JsonNode> response, FlowFile output, String schemaName) throws IOException {
           final Map<String, String> attributeMap = initAttributeMap(input, schemaName);
           try (OutputStream out = session.write(output)) {
               RecordSchema schema = writerFactory.getSchema(attributeMap, null);
               RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attributeMap);
               final AtomicLong count = new AtomicLong();
               writer.beginRecordSet();
   
               response.forEach(data -> processData(schema, writer, count, data));
               writer.finishRecordSet();
               writer.close();
               attributes.put("record.count", String.valueOf(count.get()));
           } catch (SchemaNotFoundException e) {
               throw new RuntimeException(e);
           }
       }
       
       private void processData(RecordSchema schema, RecordSetWriter writer, AtomicLong count, JsonNode data) {
           try {
               Map<String,Object> mapObj = mapper.convertValue(data, new TypeReference<Map<String, Object>>(){});
               Record record = new MapRecord(schema, mapObj);
               writer.write(record);
           } catch(IOException | IllegalArgumentException ex) {
               throw new RuntimeException(ex);
           }
           count.incrementAndGet();
       }
   
       private Map<String, String> getAttributeMap(FlowFile input, String schemaName) {
           Map<String, String> attrs = new HashMap<>();
           if (input != null) {
               attrs = input.getAttributes();
           } else {
               attrs.put("schema.name", schemaName);
           }
           return attrs;
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] sjyang18 commented on a change in pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
sjyang18 commented on a change in pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#discussion_r574670066



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/GetAzureCosmosDBRecord.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@Tags({ "azure", "cosmos", "record", "read", "fetch" })
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@CapabilityDescription("A record-oriented GET processor that uses the record writers to write the Azure Cosmos SQL select query result set.")
+public class GetAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
+    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-writer-factory")
+        .displayName("Record Writer")
+        .description("The record writer to use to write the result sets")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("schema-name")
+        .displayName("Schema Name")
+        .description("The name of the schema in the configured schema registry to use for the query results")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .defaultValue("${schema.name}")
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("sql-core-document-query")
+        .displayName("SQL Core Document Query")
+        .description("The SQL select query to execute. "
+                + "This should be a valid SQL select query to Cosmos DB with core sql api")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    public static final PropertyDescriptor MAX_RESPONSE_PAGE_SIZE = new PropertyDescriptor.Builder()
+        .name("max-page-size")
+        .displayName("Max Page Size")
+        .description("The maximum number of elements in a response page from Cosmos DB")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private ComponentLog logger;
+    private final static ObjectMapper mapper;

Review comment:
       Thank you for reviewing my PR. I have taken most of suggestions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] Lehel44 commented on a change in pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on a change in pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#discussion_r573024060



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITGetCosmosDocumentRecord.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.azure.cosmos.CosmosException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MockSchemaRegistry;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class ITGetCosmosDocumentRecord extends ITAbstractAzureCosmosDBDocument {
+
+    private static final String TEST_COSMOS_QUERY = "select top 100 * from c";
+    private static final String TEST_PARTITION_KEY = "category";
+    private static List<JsonNode> testData;
+    private static int numOfTestData = 10;
+    private static RecordSchema SCHEMA;

Review comment:
       I think these fields can be final, and would you please rename the constants uppercase to comply to Java conventions? E.g.
   
   ```suggestion
       private static final List<JsonNode> TEST_DATA;
       private static final int NUM_OF_TEST_DATA = 10;
       private static final RecordSchema SCHEMA;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] github-actions[bot] commented on pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#issuecomment-974730076


   We're marking this PR as stale due to lack of updates in the past few months. If after another couple of weeks the stale label has not been removed this PR will be closed. This stale marker and eventual auto close does not indicate a judgement of the PR just lack of reviewer bandwidth and helps us keep the PR queue more manageable.  If you would like this PR re-opened you can do so and a committer can remove the stale tag.  Or you can open a new PR.  Try to help review other PRs to increase PR review bandwidth which in turn helps yours.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] Lehel44 commented on a change in pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on a change in pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#discussion_r573018147



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/GetAzureCosmosDBRecord.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@Tags({ "azure", "cosmos", "record", "read", "fetch" })
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@CapabilityDescription("A record-oriented GET processor that uses the record writers to write the Azure Cosmos SQL select query result set.")
+public class GetAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
+    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-writer-factory")
+        .displayName("Record Writer")
+        .description("The record writer to use to write the result sets")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("schema-name")
+        .displayName("Schema Name")
+        .description("The name of the schema in the configured schema registry to use for the query results")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .defaultValue("${schema.name}")
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("sql-core-document-query")
+        .displayName("SQL Core Document Query")
+        .description("The SQL select query to execute. "
+                + "This should be a valid SQL select query to Cosmos DB with core sql api")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    public static final PropertyDescriptor MAX_RESPONSE_PAGE_SIZE = new PropertyDescriptor.Builder()
+        .name("max-page-size")
+        .displayName("Max Page Size")
+        .description("The maximum number of elements in a response page from Cosmos DB")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private ComponentLog logger;
+    private final static ObjectMapper mapper;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(QUERY);
+        _propertyDescriptors.add(WRITER_FACTORY);
+        _propertyDescriptors.add(SCHEMA_NAME);
+        _propertyDescriptors.add(MAX_RESPONSE_PAGE_SIZE);
+        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        _relationships.add(REL_ORIGINAL);
+        relationships = Collections.unmodifiableSet(_relationships);
+        mapper = new ObjectMapper();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        Collection<ValidationResult> result = super.customValidate(context);
+
+        boolean queryIsSet = context.getProperty(QUERY).isSet();
+        if (!queryIsSet) {
+            final String msg = QUERY.getDisplayName() + " must be set.";
+            result.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+        }
+        return result;
+    }
+
+    private String getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
+        String query = null;
+
+        if (context.getProperty(QUERY).isSet()) {
+            query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+        } else if (!context.getProperty(QUERY).isSet() && input == null) {
+            query = "select top 100 * from c";
+        }
+        return query;
+    }
+
+    private Map<String, String> getAttributes(ProcessContext context, FlowFile input) {
+        final Map<String, String> attributes = new HashMap<>();
+
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+
+        if (context.getProperty(QUERY).isSet()) {
+            final String query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+            attributes.put("query", query);
+        }
+        return attributes;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        this.writerFactory =context.getProperty(WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+
+        final FlowFile input = context.hasIncomingConnection() ? session.get() : null;
+        logger = getLogger();
+
+        if (input == null && context.hasNonLoopConnection()) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("No flowfile input and NonLoopConnection. Ending onTrigger... ");
+            }
+            return;
+        }
+        final String query = getQuery(context, session, input);
+        final Map<String, String> attributes = getAttributes(context, input);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Running Cosmos SQL query : " + query);
+        }
+        final CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
+        final CosmosContainer container = getContainer();
+        final CosmosPagedIterable<JsonNode> response =  container != null
+                ? container.queryItems(query, queryOptions, JsonNode.class)
+                : null;
+        if (response == null) {
+            logger.error("Fails to get CosmosPagedIterable<JsonNode> response");
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            return;
+        }
+        FlowFile output = input != null ? session.create(input) : session.create();
+        try {
+            logger.debug("Start to process data from Azure Cosmos DB");
+            final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(input).getValue();
+            try (OutputStream out = session.write(output)) {
+                Map<String, String> attrs = input != null ? input.getAttributes() : new HashMap<String, String>(){{
+                    put("schema.name", schemaName);
+                }};
+                RecordSchema schema = writerFactory.getSchema(attrs, null);
+                RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attrs);
+                final AtomicLong count = new AtomicLong();
+                writer.beginRecordSet();
+
+                response.forEach(data ->{
+                    try {
+                        Map<String,Object> mapObj = mapper.convertValue(data, new TypeReference<Map<String, Object>>(){});
+                        Record record = new MapRecord(schema, mapObj);
+                        writer.write(record);
+                    } catch(IOException | IllegalArgumentException ex) {
+                        throw new RuntimeException(ex);
+                    }
+                    count.incrementAndGet();
+                });
+                writer.finishRecordSet();
+                writer.close();
+                out.close();
+                attributes.put("record.count", String.valueOf(count.get()));
+            } catch (SchemaNotFoundException e) {
+                throw new RuntimeException(e);
+            }

Review comment:
       I'd recommend a little refactor here, to break down the complexity of this method. I'd extract the nested try block and the map initialization. The naming can be refined of course. Also I encourage using some custom more specific exception than RuntimeException.
   
   ```
       private void processSession(ProcessSession session, FlowFile input, Map<String, String> attributes, CosmosPagedIterable<JsonNode> response, FlowFile output, String schemaName) throws IOException {
           final Map<String, String> attributeMap = initAttributeMap(input, schemaName);
           try (OutputStream out = session.write(output)) {
               RecordSchema schema = writerFactory.getSchema(attributeMap, null);
               RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attributeMap);
               final AtomicLong count = new AtomicLong();
               writer.beginRecordSet();
   
               response.forEach(data -> processData(schema, writer, count, data));
               writer.finishRecordSet();
               writer.close();
               attributes.put("record.count", String.valueOf(count.get()));
           } catch (SchemaNotFoundException e) {
               throw new RuntimeException(e);
           }
       }
       
       private void processData(RecordSchema schema, RecordSetWriter writer, AtomicLong count, JsonNode data) {
           try {
               Map<String,Object> mapObj = mapper.convertValue(data, new TypeReference<Map<String, Object>>(){});
               Record record = new MapRecord(schema, mapObj);
               writer.write(record);
           } catch(IOException | IllegalArgumentException ex) {
               throw new RuntimeException(ex);
           }
           count.incrementAndGet();
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] Lehel44 commented on a change in pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on a change in pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#discussion_r573025093



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITGetCosmosDocumentRecord.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.azure.cosmos.CosmosException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MockSchemaRegistry;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class ITGetCosmosDocumentRecord extends ITAbstractAzureCosmosDBDocument {
+
+    private static final String TEST_COSMOS_QUERY = "select top 100 * from c";
+    private static final String TEST_PARTITION_KEY = "category";
+    private static List<JsonNode> testData;
+    private static int numOfTestData = 10;
+    private static RecordSchema SCHEMA;
+
+    static {
+        final ObjectMapper mapper = new ObjectMapper();
+        final List<RecordField> testDataFields = new ArrayList<>();
+        final RecordField idField = new RecordField("id", RecordFieldType.STRING.getDataType());
+        final RecordField categoryField = new RecordField(TEST_PARTITION_KEY, RecordFieldType.INT.getDataType());
+        final RecordField payloadField = new RecordField("payload", RecordFieldType.STRING.getDataType());
+        testDataFields.add(idField);
+        testDataFields.add(categoryField);
+        testDataFields.add(payloadField);;
+        SCHEMA = new SimpleRecordSchema(testDataFields);
+        JsonNode doc = null;
+
+        testData = new ArrayList<>();
+        for (int i=0; i< numOfTestData; i++) {
+            JsonObject json =  new JsonObject();
+            json.addProperty("id", ""+i);
+            json.addProperty(TEST_COSMOS_PARTITION_KEY_FIELD_NAME, MockTestBase.getRandomInt(1,4));
+            json.addProperty("payload", RandomStringUtils.random(100, true, true));
+            try {
+                doc = mapper.readTree(json.toString());
+            } catch(IOException exp) {
+                exp.printStackTrace();
+            }
+            testData.add(doc);
+        }
+
+        for (JsonNode jdoc : testData) {
+            try {
+                container.upsertItem(jdoc);
+            } catch(CosmosException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+    @Override
+    protected Class<? extends Processor> getProcessorClass() {
+        return GetAzureCosmosDBRecord.class;
+    }
+    @Before
+    public void setupTest() throws Exception {
+        runner.setProperty(GetAzureCosmosDBRecord.QUERY, TEST_COSMOS_QUERY);
+        MockSchemaRegistry registry = new MockSchemaRegistry();
+        JsonRecordSetWriter writer = new JsonRecordSetWriter();
+        registry.addSchema("sample", SCHEMA);
+
+        runner.addControllerService("writer", writer);
+        runner.addControllerService("registry", registry);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY);
+        runner.enableControllerService(registry);
+        runner.enableControllerService(writer);
+        runner.setProperty(GetAzureCosmosDBRecord.WRITER_FACTORY, "writer");
+    }
+    @Test
+    public void testReadRecords() {
+        runner.setVariable("schema.name", "sample");
+        runner.enqueue(new byte[] {});
+        runner.run();
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractAzureCosmosDBProcessor.REL_SUCCESS);
+        assertTrue(flowFiles.size() == 1);

Review comment:
       Optionally, you can possibly use assertEquals here and also in the other test cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] Lehel44 commented on a change in pull request #4754: NIFI-7417: GetAzureCosmosDBRecord processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on a change in pull request #4754:
URL: https://github.com/apache/nifi/pull/4754#discussion_r573018147



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/GetAzureCosmosDBRecord.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@Tags({ "azure", "cosmos", "record", "read", "fetch" })
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@CapabilityDescription("A record-oriented GET processor that uses the record writers to write the Azure Cosmos SQL select query result set.")
+public class GetAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
+    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-writer-factory")
+        .displayName("Record Writer")
+        .description("The record writer to use to write the result sets")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("schema-name")
+        .displayName("Schema Name")
+        .description("The name of the schema in the configured schema registry to use for the query results")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .defaultValue("${schema.name}")
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("sql-core-document-query")
+        .displayName("SQL Core Document Query")
+        .description("The SQL select query to execute. "
+                + "This should be a valid SQL select query to Cosmos DB with core sql api")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    public static final PropertyDescriptor MAX_RESPONSE_PAGE_SIZE = new PropertyDescriptor.Builder()
+        .name("max-page-size")
+        .displayName("Max Page Size")
+        .description("The maximum number of elements in a response page from Cosmos DB")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private ComponentLog logger;
+    private final static ObjectMapper mapper;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(QUERY);
+        _propertyDescriptors.add(WRITER_FACTORY);
+        _propertyDescriptors.add(SCHEMA_NAME);
+        _propertyDescriptors.add(MAX_RESPONSE_PAGE_SIZE);
+        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        _relationships.add(REL_ORIGINAL);
+        relationships = Collections.unmodifiableSet(_relationships);
+        mapper = new ObjectMapper();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        Collection<ValidationResult> result = super.customValidate(context);
+
+        boolean queryIsSet = context.getProperty(QUERY).isSet();
+        if (!queryIsSet) {
+            final String msg = QUERY.getDisplayName() + " must be set.";
+            result.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+        }
+        return result;
+    }
+
+    private String getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
+        String query = null;
+
+        if (context.getProperty(QUERY).isSet()) {
+            query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+        } else if (!context.getProperty(QUERY).isSet() && input == null) {
+            query = "select top 100 * from c";
+        }
+        return query;
+    }
+
+    private Map<String, String> getAttributes(ProcessContext context, FlowFile input) {
+        final Map<String, String> attributes = new HashMap<>();
+
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+
+        if (context.getProperty(QUERY).isSet()) {
+            final String query = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+            attributes.put("query", query);
+        }
+        return attributes;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        this.writerFactory =context.getProperty(WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+
+        final FlowFile input = context.hasIncomingConnection() ? session.get() : null;
+        logger = getLogger();
+
+        if (input == null && context.hasNonLoopConnection()) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("No flowfile input and NonLoopConnection. Ending onTrigger... ");
+            }
+            return;
+        }
+        final String query = getQuery(context, session, input);
+        final Map<String, String> attributes = getAttributes(context, input);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Running Cosmos SQL query : " + query);
+        }
+        final CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
+        final CosmosContainer container = getContainer();
+        final CosmosPagedIterable<JsonNode> response =  container != null
+                ? container.queryItems(query, queryOptions, JsonNode.class)
+                : null;
+        if (response == null) {
+            logger.error("Fails to get CosmosPagedIterable<JsonNode> response");
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            return;
+        }
+        FlowFile output = input != null ? session.create(input) : session.create();
+        try {
+            logger.debug("Start to process data from Azure Cosmos DB");
+            final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(input).getValue();
+            try (OutputStream out = session.write(output)) {
+                Map<String, String> attrs = input != null ? input.getAttributes() : new HashMap<String, String>(){{
+                    put("schema.name", schemaName);
+                }};
+                RecordSchema schema = writerFactory.getSchema(attrs, null);
+                RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attrs);
+                final AtomicLong count = new AtomicLong();
+                writer.beginRecordSet();
+
+                response.forEach(data ->{
+                    try {
+                        Map<String,Object> mapObj = mapper.convertValue(data, new TypeReference<Map<String, Object>>(){});
+                        Record record = new MapRecord(schema, mapObj);
+                        writer.write(record);
+                    } catch(IOException | IllegalArgumentException ex) {
+                        throw new RuntimeException(ex);
+                    }
+                    count.incrementAndGet();
+                });
+                writer.finishRecordSet();
+                writer.close();
+                out.close();
+                attributes.put("record.count", String.valueOf(count.get()));
+            } catch (SchemaNotFoundException e) {
+                throw new RuntimeException(e);
+            }

Review comment:
       I'd recommend a little refactor here, to break down the complexity of this method. I'd extract the nested try block and the map initialization. The naming can be refined of course. 
   - Also I encourage using some custom more specific exception than RuntimeException.
   - And to log the exception by logger when you catch it.
   
   ```
       private void processSession(ProcessSession session, FlowFile input, Map<String, String> attributes, CosmosPagedIterable<JsonNode> response, FlowFile output, String schemaName) throws IOException {
           final Map<String, String> attributeMap = initAttributeMap(input, schemaName);
           try (OutputStream out = session.write(output)) {
               RecordSchema schema = writerFactory.getSchema(attributeMap, null);
               RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attributeMap);
               final AtomicLong count = new AtomicLong();
               writer.beginRecordSet();
   
               response.forEach(data -> processData(schema, writer, count, data));
               writer.finishRecordSet();
               writer.close();
               attributes.put("record.count", String.valueOf(count.get()));
           } catch (SchemaNotFoundException e) {
               throw new RuntimeException(e);
           }
       }
       
       private void processData(RecordSchema schema, RecordSetWriter writer, AtomicLong count, JsonNode data) {
           try {
               Map<String,Object> mapObj = mapper.convertValue(data, new TypeReference<Map<String, Object>>(){});
               Record record = new MapRecord(schema, mapObj);
               writer.write(record);
           } catch(IOException | IllegalArgumentException ex) {
               throw new RuntimeException(ex);
           }
           count.incrementAndGet();
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org