You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jf...@apache.org on 2020/11/03 08:01:06 UTC

[nifi] branch main updated: NIFI-7406 Added PutAzureCosmosRecord processor for Azure Cosmos DB

This is an automated email from the ASF dual-hosted git repository.

jfrazee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 80f49eb  NIFI-7406 Added PutAzureCosmosRecord processor for Azure Cosmos DB
80f49eb is described below

commit 80f49eb7bdcd761cdf42631d4d9378c4004fb4e7
Author: sjyang18 <il...@hotmail.com>
AuthorDate: Mon May 4 22:58:36 2020 +0000

    NIFI-7406 Added PutAzureCosmosRecord processor for Azure Cosmos DB
    
    This closes #4253
    
    Signed-off-by: Joey Frazee <jf...@apache.org>
---
 .../nifi-azure-processors/pom.xml                  | 102 +++++++
 .../document/AbstractAzureCosmosDBProcessor.java   | 304 +++++++++++++++++++++
 .../azure/cosmos/document/AzureCosmosDBUtils.java  |  63 +++++
 .../cosmos/document/PutAzureCosmosDBRecord.java    | 221 +++++++++++++++
 .../document/AzureCosmosDBClientService.java       | 163 +++++++++++
 .../org.apache.nifi.controller.ControllerService   |   3 +-
 .../services/org.apache.nifi.processor.Processor   |   3 +-
 .../document/ITAbstractAzureCosmosDBDocument.java  | 216 +++++++++++++++
 .../cosmos/document/ITPutAzureCosmosDBRecord.java  | 165 +++++++++++
 .../azure/cosmos/document/MockTestBase.java        |  74 +++++
 .../document/PutAzureCosmosDBRecordTest.java       | 294 ++++++++++++++++++++
 .../document/TestAzureCosmosDBClientService.java   |  72 +++++
 .../nifi-azure-services-api/pom.xml                |  37 +++
 .../cosmos/AzureCosmosDBConnectionService.java     |  33 +++
 nifi-nar-bundles/nifi-azure-bundle/pom.xml         |   3 +-
 15 files changed, 1750 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 253a43d..58c0ccc 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -60,11 +60,23 @@
             <groupId>com.azure</groupId>
             <artifactId>azure-core</artifactId>
             <version>${azure.core.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.dataformat</groupId>
+                    <artifactId>jackson-dataformat-xml</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-identity</artifactId>
             <version>1.0.6</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.azure</groupId>
+                    <artifactId>azure-core</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.microsoft.azure</groupId>
@@ -75,15 +87,74 @@
             <groupId>com.microsoft.azure</groupId>
             <artifactId>azure-eventhubs-eph</artifactId>
             <version>${azure-eventhubs-eph.version}</version>
+            <exclusions>
+                <!--depdendency resolution with com.azure sdk -->
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.microsoft.azure</groupId>
             <artifactId>azure-storage</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-cosmos</artifactId>
+            <version>${azure-cosmos.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.azure</groupId>
+                    <artifactId>azure-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-storage-file-datalake</artifactId>
             <version>12.2.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.azure</groupId>
+                    <artifactId>azure-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- overriding jackson-core in azure-storage -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>27.0.1-jre</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-xml</artifactId>
+            <version>${jackson.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -99,6 +170,37 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-json-utils</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-inline</artifactId>
+            <version>3.3.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-processor-utils</artifactId>
             <version>1.13.0-SNAPSHOT</version>
         </dependency>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AbstractAzureCosmosDBProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AbstractAzureCosmosDBProcessor.java
new file mode 100644
index 0000000..b92098a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AbstractAzureCosmosDBProcessor.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import com.azure.cosmos.ConsistencyLevel;
+import com.azure.cosmos.CosmosClient;
+import com.azure.cosmos.CosmosClientBuilder;
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.CosmosDatabase;
+import com.azure.cosmos.CosmosException;
+import com.azure.cosmos.models.CosmosContainerProperties;
+import com.azure.cosmos.models.CosmosContainerResponse;
+import com.azure.cosmos.models.CosmosDatabaseResponse;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.azure.cosmos.AzureCosmosDBConnectionService;
+
+public abstract class AbstractAzureCosmosDBProcessor extends AbstractProcessor {
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("All FlowFiles that are written to Cosmos DB are routed to this relationship")
+        .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("All FlowFiles that cannot be written to Cosmos DB are routed to this relationship")
+        .build();
+
+    static final Relationship REL_ORIGINAL = new Relationship.Builder()
+        .name("original")
+        .description("All input FlowFiles that are part of a successful are routed to this relationship")
+        .build();
+
+    static final PropertyDescriptor CONNECTION_SERVICE = new PropertyDescriptor.Builder()
+        .name("azure-cosmos-db-connection-service")
+        .displayName("Cosmos DB Connection Service")
+        .description("If configured, the controller service used to obtain the connection string and access key")
+        .required(false)
+        .identifiesControllerService(AzureCosmosDBConnectionService.class)
+        .build();
+
+    static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
+        .name("azure-cosmos-db-name")
+        .displayName("Cosmos DB Name")
+        .description("The database name or id. This is used as the namespace for document collections or containers")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .build();
+
+    static final PropertyDescriptor CONTAINER_ID = new PropertyDescriptor.Builder()
+        .name("azure-cosmos-db-container-id")
+        .displayName("Cosmos DB Container ID")
+        .description("The unique identifier for the container")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .build();
+
+    static final PropertyDescriptor PARTITION_KEY = new PropertyDescriptor.Builder()
+        .name("azure-cosmos-db-partition-key")
+        .displayName("Cosmos DB Partition Key")
+        .description("The partition key used to distribute data among servers")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .build();
+
+    static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
+        .name("charactor-set")
+        .displayName("Charactor Set")
+        .description("The Character Set in which the data is encoded")
+        .required(false)
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .build();
+
+    static final List<PropertyDescriptor> descriptors;
+
+    static {
+        List<PropertyDescriptor> _temp = new ArrayList<>();
+        _temp.add(CONNECTION_SERVICE);
+        _temp.add(AzureCosmosDBUtils.URI);
+        _temp.add(AzureCosmosDBUtils.DB_ACCESS_KEY);
+        _temp.add(AzureCosmosDBUtils.CONSISTENCY);
+        _temp.add(DATABASE_NAME);
+        _temp.add(CONTAINER_ID);
+        _temp.add(PARTITION_KEY);
+        descriptors = Collections.unmodifiableList(_temp);
+    }
+
+    private CosmosClient cosmosClient;
+    private CosmosContainer container;
+    private AzureCosmosDBConnectionService connectionService;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) throws CosmosException {
+        final ComponentLog logger = getLogger();
+
+        if (context.getProperty(CONNECTION_SERVICE).isSet()) {
+            this.connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(AzureCosmosDBConnectionService.class);
+            this.cosmosClient = this.connectionService.getCosmosClient();
+        } else {
+            final String uri = context.getProperty(AzureCosmosDBUtils.URI).getValue();
+            final String accessKey = context.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).getValue();
+            final String selectedConsistency = context.getProperty(AzureCosmosDBUtils.CONSISTENCY).getValue();
+            final ConsistencyLevel clevel;
+            switch (selectedConsistency) {
+                case AzureCosmosDBUtils.CONSISTENCY_STRONG:
+                    clevel =  ConsistencyLevel.STRONG;
+                    break;
+                case AzureCosmosDBUtils.CONSISTENCY_CONSISTENT_PREFIX:
+                    clevel = ConsistencyLevel.CONSISTENT_PREFIX;
+                    break;
+                case AzureCosmosDBUtils.CONSISTENCY_SESSION:
+                    clevel = ConsistencyLevel.SESSION;
+                    break;
+                case AzureCosmosDBUtils.CONSISTENCY_BOUNDED_STALENESS:
+                    clevel = ConsistencyLevel.BOUNDED_STALENESS;
+                    break;
+                case AzureCosmosDBUtils.CONSISTENCY_EVENTUAL:
+                    clevel = ConsistencyLevel.EVENTUAL;
+                    break;
+                default:
+                    clevel = ConsistencyLevel.SESSION;
+            }
+            if (cosmosClient != null) {
+                onStopped();
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("Creating CosmosClient");
+            }
+            createCosmosClient(uri, accessKey, clevel);
+        }
+        getCosmosDocumentContainer(context);
+        doPostActionOnSchedule(context);
+    }
+
+    protected void createCosmosClient(final String uri, final String accessKey, final ConsistencyLevel clevel) {
+        this.cosmosClient = new CosmosClientBuilder()
+                                .endpoint(uri)
+                                .key(accessKey)
+                                .consistencyLevel(clevel)
+                                .buildClient();
+    }
+
+    protected abstract void doPostActionOnSchedule(final ProcessContext context);
+
+    protected void getCosmosDocumentContainer(final ProcessContext context) throws CosmosException {
+        final String databaseName = context.getProperty(DATABASE_NAME).getValue();
+        final String containerID = context.getProperty(CONTAINER_ID).getValue();
+        final String partitionKey = context.getProperty(PARTITION_KEY).getValue();
+
+        final CosmosDatabaseResponse databaseResponse = this.cosmosClient.createDatabaseIfNotExists(databaseName);
+        final CosmosDatabase database = this.cosmosClient.getDatabase(databaseResponse.getProperties().getId());
+
+        final CosmosContainerProperties containerProperties =
+            new CosmosContainerProperties(containerID, "/"+partitionKey);
+
+        //  Create container by default if Not exists.
+        final CosmosContainerResponse containerResponse = database.createContainerIfNotExists(containerProperties);
+        this.container =  database.getContainer(containerResponse.getProperties().getId());
+    }
+
+    @OnStopped
+    public final void onStopped() {
+        final ComponentLog logger = getLogger();
+        if (connectionService == null && cosmosClient != null) {
+            // close client only when cosmoclient is created in Processor.
+            if(logger.isDebugEnabled()) {
+                logger.debug("Closing CosmosClient");
+            }
+            try{
+                this.container = null;
+                this.cosmosClient.close();
+            }catch(CosmosException e) {
+                logger.error("Error closing Cosmos DB client due to {}", new Object[] { e.getMessage() }, e);
+            } finally {
+                this.cosmosClient = null;
+            }
+        }
+    }
+
+    protected String getURI(final ProcessContext context) {
+        if (this.connectionService != null) {
+            return this.connectionService.getURI();
+        } else {
+            return context.getProperty(AzureCosmosDBUtils.URI).getValue();
+        }
+    }
+
+    protected String getAccessKey(final ProcessContext context) {
+        if (this.connectionService != null) {
+            return this.connectionService.getAccessKey();
+        } else {
+            return context.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).getValue();
+        }
+    }
+
+    protected String getConsistencyLevel(final ProcessContext context) {
+        if (this.connectionService != null) {
+            return this.connectionService.getConsistencyLevel();
+        } else {
+            return context.getProperty(AzureCosmosDBUtils.CONSISTENCY).getValue();
+        }
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        List<ValidationResult> retVal = new ArrayList<>();
+
+        boolean connectionServiceIsSet = context.getProperty(CONNECTION_SERVICE).isSet();
+        boolean uriIsSet = context.getProperty(AzureCosmosDBUtils.URI).isSet();
+        boolean accessKeyIsSet = context.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).isSet();
+        boolean databaseIsSet = context.getProperty(DATABASE_NAME).isSet();
+        boolean collectionIsSet = context.getProperty(CONTAINER_ID).isSet();
+        boolean partitionIsSet = context.getProperty(PARTITION_KEY).isSet();
+
+        if (connectionServiceIsSet && (uriIsSet || accessKeyIsSet) ) {
+            // If connection Service is set, None of the Processor variables URI and accessKey
+            // should be set.
+            final String msg = String.format(
+                "If connection service is used for DB connection, none of %s and %s should be set",
+                AzureCosmosDBUtils.URI.getDisplayName(),
+                AzureCosmosDBUtils.DB_ACCESS_KEY.getDisplayName()
+            );
+            retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+        } else if (!connectionServiceIsSet && (!uriIsSet || !accessKeyIsSet)) {
+            // If connection Service is not set, Both of the Processor variable URI and accessKey
+            // should be set.
+            final String msg = String.format(
+                "If connection service is not used for DB connection, both %s and %s should be set",
+                AzureCosmosDBUtils.URI.getDisplayName(),
+                AzureCosmosDBUtils.DB_ACCESS_KEY.getDisplayName()
+            );
+            retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+        }
+        if (!databaseIsSet) {
+            final String msg = AbstractAzureCosmosDBProcessor.DATABASE_NAME.getDisplayName() + " must be set.";
+            retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+        }
+        if (!collectionIsSet) {
+            final String msg = AbstractAzureCosmosDBProcessor.CONTAINER_ID.getDisplayName() + " must be set.";
+            retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+        }
+        if (!partitionIsSet) {
+            final String msg = AbstractAzureCosmosDBProcessor.PARTITION_KEY.getDisplayName() + " must be set.";
+            retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+        }
+        return retVal;
+    }
+
+    protected CosmosClient getCosmosClient() {
+        return cosmosClient;
+    }
+
+    protected void setCosmosClient(CosmosClient cosmosClient) {
+        this.cosmosClient = cosmosClient;
+    }
+
+    protected CosmosContainer getContainer() {
+        return container;
+    }
+
+    protected void setContainer(CosmosContainer container) {
+        this.container = container;
+    }
+
+    protected AzureCosmosDBConnectionService getConnectionService() {
+        return connectionService;
+    }
+
+    protected void setConnectionService(AzureCosmosDBConnectionService connectionService) {
+        this.connectionService = connectionService;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AzureCosmosDBUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AzureCosmosDBUtils.java
new file mode 100644
index 0000000..3b4fe22
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AzureCosmosDBUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+public final class AzureCosmosDBUtils {
+    public static final String CONSISTENCY_STRONG = "STRONG";
+    public static final String CONSISTENCY_BOUNDED_STALENESS= "BOUNDED_STALENESS";
+    public static final String CONSISTENCY_SESSION = "SESSION";
+    public static final String CONSISTENCY_CONSISTENT_PREFIX = "CONSISTENT_PREFIX";
+    public static final String CONSISTENCY_EVENTUAL = "EVENTUAL";
+
+    public static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
+        .name("azure-cosmos-db-uri")
+        .displayName("Cosmos DB URI")
+        .description("Cosmos DB URI, typically in the form of https://{databaseaccount}.documents.azure.com:443/"
+            + " Note this host URL is for Cosmos DB with Core SQL API"
+            + " from Azure Portal (Overview->URI)")
+        .required(false)
+        .addValidator(StandardValidators.URI_VALIDATOR)
+        .sensitive(true)
+        .build();
+
+    public static final PropertyDescriptor DB_ACCESS_KEY = new PropertyDescriptor.Builder()
+        .name("azure-cosmos-db-key")
+        .displayName("Cosmos DB Access Key")
+        .description("Cosmos DB Access Key from Azure Portal (Settings->Keys). "
+            + "Choose a read-write key to enable database or container creation at run time")
+        .required(false)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .sensitive(true)
+        .build();
+
+    public static final PropertyDescriptor CONSISTENCY = new PropertyDescriptor.Builder()
+        .name("azure-cosmos-db-consistency-level")
+        .displayName("Cosmos DB Consistency Level")
+        .description("Choose from five consistency levels on the consistency spectrum. "
+            + "Refer to Cosmos DB documentation for their differences")
+        .required(false)
+        .defaultValue(CONSISTENCY_SESSION)
+        .allowableValues(CONSISTENCY_STRONG, CONSISTENCY_BOUNDED_STALENESS, CONSISTENCY_SESSION,
+                CONSISTENCY_CONSISTENT_PREFIX, CONSISTENCY_EVENTUAL)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .build();
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
new file mode 100644
index 0000000..cfbb221
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.CosmosException;
+import com.azure.cosmos.implementation.ConflictException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+@EventDriven
+@Tags({ "azure", "cosmos", "insert", "record", "put" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("This processor is a record-aware processor for inserting data into Cosmos DB with Core SQL API. It uses a configured record reader and " +
+        "schema to read an incoming record set from the body of a Flowfile and then inserts those records into " +
+        "a configured Cosmos DB Container.")
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+public class PutAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
+
+    private String conflictHandlingStrategy;
+    static final AllowableValue IGNORE_CONFLICT = new AllowableValue("IGNORE", "Ignore", "Conflicting records will not be inserted, and FlowFile will not be routed to failure");
+    static final AllowableValue UPSERT_CONFLICT = new AllowableValue("UPSERT", "Upsert", "Conflicting records will be upserted, and FlowFile will not be routed to failure");
+
+    static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-reader")
+        .displayName("Record Reader")
+        .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+        .identifiesControllerService(RecordReaderFactory.class)
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor INSERT_BATCH_SIZE = new PropertyDescriptor.Builder()
+        .name("insert-batch-size")
+        .displayName("Insert Batch Size")
+        .description("The number of records to group together for one single insert operation against Cosmos DB")
+        .defaultValue("20")
+        .required(false)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
+
+    static final PropertyDescriptor CONFLICT_HANDLE_STRATEGY = new PropertyDescriptor.Builder()
+        .name("azure-cosmos-db-conflict-handling-strategy")
+        .displayName("Cosmos DB Conflict Handling Strategy")
+        .description("Choose whether to ignore or upsert when conflict error occurs during insertion")
+        .required(false)
+        .allowableValues(IGNORE_CONFLICT, UPSERT_CONFLICT)
+        .defaultValue(IGNORE_CONFLICT.getValue())
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .build();
+
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(RECORD_READER_FACTORY);
+        _propertyDescriptors.add(INSERT_BATCH_SIZE);
+        _propertyDescriptors.add(CONFLICT_HANDLE_STRATEGY);
+        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    protected void bulkInsert(final List<Map<String, Object>> records) throws CosmosException{
+        // In the future, this method will be replaced by calling createItems API
+        // for example, this.container.createItems(records);
+        // currently, no createItems API available in Azure Cosmos Java SDK
+        final ComponentLog logger = getLogger();
+        final CosmosContainer container = getContainer();
+        for (Map<String, Object> record : records){
+            try {
+                container.createItem(record);
+            } catch (ConflictException e) {
+                // insert with unique id is expected. In case conflict occurs, use the selected strategy.
+                // By default, it will ignore.
+                if (conflictHandlingStrategy != null && conflictHandlingStrategy.equals(UPSERT_CONFLICT.getValue())){
+                    container.upsertItem(record);
+                } else {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Ignoring duplicate based on selected conflict resolution strategy");
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
+                .asControllerService(RecordReaderFactory.class);
+
+        final String partitionKeyField = context.getProperty(PARTITION_KEY).getValue();
+        List<Map<String, Object>> batch = new ArrayList<>();
+        int ceiling = context.getProperty(INSERT_BATCH_SIZE).asInteger();
+        boolean error = false;
+        try (final InputStream inStream = session.read(flowFile);
+             final RecordReader reader = recordParserFactory.createRecordReader(flowFile, inStream, getLogger())) {
+
+            RecordSchema schema = reader.getSchema();
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                // Convert each Record to HashMap
+                Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(schema));
+                if(contentMap.containsKey("id")) {
+                    final Object idObj = contentMap.get("id");
+                    final String idStr = (idObj == null) ? "" : String.valueOf(idObj);
+                    if (idObj == null || StringUtils.isBlank(idStr)) {
+                        // dont put null to id
+                        contentMap.put("id", UUID.randomUUID().toString());
+                    } else {
+                        contentMap.put("id", idStr);
+                    }
+                } else {
+                    contentMap.put("id", UUID.randomUUID().toString());
+                }
+                if (!contentMap.containsKey(partitionKeyField)) {
+                    logger.error(String.format("PutAzureCosmoDBRecord failed with missing partitionKeyField (%s)", partitionKeyField));
+                    error = true;
+                    break;
+                }
+                batch.add(contentMap);
+                if (batch.size() == ceiling) {
+                    bulkInsert(batch);
+                    batch = new ArrayList<>();
+                }
+            }
+            if (!error && batch.size() > 0) {
+                bulkInsert(batch);
+            }
+        } catch (SchemaNotFoundException | MalformedRecordException | IOException | CosmosException e) {
+            logger.error("PutAzureCosmoDBRecord failed with error: {}", new Object[]{e.getMessage()}, e);
+            error = true;
+        } finally {
+            if (!error) {
+                session.getProvenanceReporter().send(flowFile, getURI(context));
+                session.transfer(flowFile, REL_SUCCESS);
+            } else {
+                session.transfer(flowFile, REL_FAILURE);
+            }
+        }
+        session.commit();
+    }
+
+    @Override
+    protected void doPostActionOnSchedule(final ProcessContext context) {
+        conflictHandlingStrategy = context.getProperty(CONFLICT_HANDLE_STRATEGY).getValue();
+        if (conflictHandlingStrategy == null)
+            conflictHandlingStrategy = IGNORE_CONFLICT.getValue();
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/cosmos/document/AzureCosmosDBClientService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/cosmos/document/AzureCosmosDBClientService.java
new file mode 100644
index 0000000..c56abb0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/cosmos/document/AzureCosmosDBClientService.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.services.azure.cosmos.document;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import com.azure.cosmos.ConsistencyLevel;
+import com.azure.cosmos.CosmosClient;
+import com.azure.cosmos.CosmosClientBuilder;
+import com.azure.cosmos.CosmosException;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processors.azure.cosmos.document.AzureCosmosDBUtils;
+import org.apache.nifi.services.azure.cosmos.AzureCosmosDBConnectionService;
+import org.apache.nifi.util.StringUtils;
+
+@Tags({"azure", "cosmos", "document", "service"})
+@CapabilityDescription(
+        "Provides a controller service that configures a connection to Cosmos DB (Core SQL API) " +
+        " and provides access to that connection to other Cosmos DB-related components."
+)
+public class AzureCosmosDBClientService extends AbstractControllerService implements AzureCosmosDBConnectionService {
+    private String uri;
+    private String accessKey;
+    private String consistencyLevel;
+    private CosmosClient cosmosClient;
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        this.uri = context.getProperty(AzureCosmosDBUtils.URI).getValue();
+        this.accessKey = context.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).getValue();
+        final ConsistencyLevel clevel;
+        final String selectedConsistency = context.getProperty(AzureCosmosDBUtils.CONSISTENCY).getValue();
+
+        switch(selectedConsistency) {
+            case AzureCosmosDBUtils.CONSISTENCY_STRONG:
+                clevel =  ConsistencyLevel.STRONG;
+                break;
+            case AzureCosmosDBUtils.CONSISTENCY_CONSISTENT_PREFIX:
+                clevel = ConsistencyLevel.CONSISTENT_PREFIX;
+                break;
+            case AzureCosmosDBUtils.CONSISTENCY_SESSION:
+                clevel = ConsistencyLevel.SESSION;
+                break;
+            case AzureCosmosDBUtils.CONSISTENCY_BOUNDED_STALENESS:
+                clevel = ConsistencyLevel.BOUNDED_STALENESS;
+                break;
+            case AzureCosmosDBUtils.CONSISTENCY_EVENTUAL:
+                clevel = ConsistencyLevel.EVENTUAL;
+                break;
+            default:
+                clevel = ConsistencyLevel.SESSION;
+        }
+
+        if (this.cosmosClient != null) {
+            onStopped();
+        }
+        consistencyLevel = clevel.toString();
+        createCosmosClient(uri, accessKey, clevel);
+    }
+
+
+    @OnStopped
+    public final void onStopped() {
+        if (this.cosmosClient != null) {
+            try {
+                cosmosClient.close();
+            } catch(CosmosException e) {
+                getLogger().error("Closing cosmosClient Failed: " + e.getMessage(), e);
+            } finally {
+                this.cosmosClient = null;
+            }
+        }
+    }
+
+    protected void createCosmosClient(final String uri, final String accessKey, final ConsistencyLevel clevel){
+        this.cosmosClient = new CosmosClientBuilder()
+                                .endpoint(uri)
+                                .key(accessKey)
+                                .consistencyLevel(clevel)
+                                .buildClient();
+    }
+
+    static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    static {
+        descriptors.add(AzureCosmosDBUtils.URI);
+        descriptors.add(AzureCosmosDBUtils.DB_ACCESS_KEY);
+        descriptors.add(AzureCosmosDBUtils.CONSISTENCY);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public String getURI() {
+        return this.uri;
+    }
+
+    @Override
+    public String getAccessKey() {
+        return this.accessKey;
+    }
+    @Override
+    public String getConsistencyLevel() {
+        return this.consistencyLevel;
+    }
+
+    @Override
+    public CosmosClient getCosmosClient() {
+        return this.cosmosClient;
+    }
+    public void setCosmosClient(CosmosClient client) {
+        this.cosmosClient = client;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        final String uri = validationContext.getProperty(AzureCosmosDBUtils.URI).getValue();
+        final String accessKey = validationContext.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).getValue();
+
+        if (StringUtils.isBlank(uri) || StringUtils.isBlank(accessKey)) {
+            results.add(new ValidationResult.Builder()
+                    .subject("AzureStorageCredentialsControllerService")
+                    .valid(false)
+                    .explanation(
+                        "either " + AzureCosmosDBUtils.URI.getDisplayName()
+                        + " or " + AzureCosmosDBUtils.DB_ACCESS_KEY.getDisplayName() + " is required")
+                    .build());
+        }
+        return results;
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 4179771..5e63d4b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -14,4 +14,5 @@
 # limitations under the License.
 org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService
 org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup
-org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService
\ No newline at end of file
+org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService
+org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 015596e..6456dd0 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -24,4 +24,5 @@ org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage
 org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage
 org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage
 org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage
-org.apache.nifi.processors.azure.storage.ListAzureDataLakeStorage
\ No newline at end of file
+org.apache.nifi.processors.azure.storage.ListAzureDataLakeStorage
+org.apache.nifi.processors.azure.cosmos.document.PutAzureCosmosDBRecord
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITAbstractAzureCosmosDBDocument.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITAbstractAzureCosmosDBDocument.java
new file mode 100644
index 0000000..06bc335
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITAbstractAzureCosmosDBDocument.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+import com.azure.cosmos.CosmosClient;
+import com.azure.cosmos.CosmosClientBuilder;
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.CosmosDatabase;
+import com.azure.cosmos.CosmosException;
+import com.azure.cosmos.models.CosmosContainerProperties;
+import com.azure.cosmos.models.CosmosContainerResponse;
+import com.azure.cosmos.models.CosmosDatabaseResponse;
+import com.azure.cosmos.models.CosmosItemRequestOptions;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.models.PartitionKey;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+public abstract class ITAbstractAzureCosmosDBDocument {
+    static Logger logger = Logger.getLogger(ITAbstractAzureCosmosDBDocument.class.getName());
+
+    private static final Properties CONFIG;
+
+    private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
+    protected static final String TEST_COSMOS_DB_NAME = "nifi-test-db";
+    protected static final String TEST_COSMOS_CONTAINER_NAME = "nifi-test-container";
+    protected static final String TEST_COSMOS_PARTITION_KEY_FIELD_NAME = "category";
+    protected static CosmosClient client;
+    protected static CosmosDatabase cdb;
+    protected static CosmosContainer container;
+
+    static {
+        final FileInputStream fis;
+        CONFIG = new Properties();
+        try {
+            fis = new FileInputStream(CREDENTIALS_FILE);
+            try {
+                CONFIG.load(fis);
+            } catch (IOException e) {
+                fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
+            } finally {
+                FileUtils.closeQuietly(fis);
+            }
+        } catch (FileNotFoundException e) {
+            fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
+        }
+    }
+
+    protected static String getComosURI() {
+        return CONFIG.getProperty("cosmosURI");
+    }
+
+    protected static String getCosmosKey() {
+        return CONFIG.getProperty("cosmosKey");
+    }
+
+    protected TestRunner runner;
+
+    @BeforeClass
+    public static void createTestDBContainerIfNeeded() throws CosmosException {
+        final String testDBURI =  getComosURI();
+        final String testDBContainer = getCosmosKey();
+
+        client = new CosmosClientBuilder()
+                .endpoint(testDBURI)
+                .key(testDBContainer)
+                .buildClient();
+
+        CosmosDatabaseResponse databaseResponse = client.createDatabase(TEST_COSMOS_DB_NAME);
+        cdb = client.getDatabase(databaseResponse.getProperties().getId());
+        CosmosContainerProperties containerProperties =
+            new CosmosContainerProperties(TEST_COSMOS_CONTAINER_NAME, "/"+TEST_COSMOS_PARTITION_KEY_FIELD_NAME);
+        CosmosContainerResponse containerResponse = cdb.createContainer(containerProperties);
+        container = cdb.getContainer(containerResponse.getProperties().getId());
+        assertNotNull(container);
+    }
+
+    @AfterClass
+    public static void dropTestDBAndContainer() throws CosmosException {
+        resetTestCosmosConnection();
+        if (container != null) {
+            try {
+                container.delete();
+            } catch(CosmosException e) {
+                logger.info(e.getMessage());
+            } finally {
+                container = null;
+            }
+        }
+        if (cdb != null) {
+            try {
+                cdb.delete();
+            } catch(CosmosException e) {
+                logger.info(e.getMessage());
+            } finally {
+                cdb = null;
+            }
+        }
+        if (client != null){
+            try {
+                client.close();
+            } catch(CosmosException e) {
+                logger.info(e.getMessage());
+            } finally {
+                client = null;
+            }
+        }
+    }
+
+    @Before
+    public void setUpCosmosIT() {
+        final String testDBURI =  getComosURI();
+        final String testDBContainer = getCosmosKey();
+        runner = TestRunners.newTestRunner(getProcessorClass());
+        runner.setProperty(AzureCosmosDBUtils.URI, testDBURI);
+        runner.setProperty(AzureCosmosDBUtils.DB_ACCESS_KEY, testDBContainer);
+        runner.setProperty(AbstractAzureCosmosDBProcessor.DATABASE_NAME, TEST_COSMOS_DB_NAME);
+        runner.setProperty(AbstractAzureCosmosDBProcessor.CONTAINER_ID, TEST_COSMOS_CONTAINER_NAME);
+        runner.setProperty(AbstractAzureCosmosDBProcessor.PARTITION_KEY, TEST_COSMOS_PARTITION_KEY_FIELD_NAME);
+        runner.setIncomingConnection(false);
+        runner.setNonLoopConnection(false);
+    }
+
+    protected static void closeClient() {
+        try {
+            client.close();
+        } catch(CosmosException e){
+            logger.info(e.getMessage());
+        } finally {
+            client =null;
+            cdb = null;
+            container = null;
+        }
+    }
+
+    protected static void resetTestCosmosConnection() {
+        if (client != null) {
+            closeClient();
+        }
+        final String testDBURI =  getComosURI();
+        final String testDBContainer = getCosmosKey();
+
+        client = new CosmosClientBuilder()
+                    .endpoint(testDBURI)
+                    .key(testDBContainer)
+                    .buildClient();
+        cdb =  client.getDatabase(TEST_COSMOS_DB_NAME);
+        container =  cdb.getContainer(TEST_COSMOS_CONTAINER_NAME);
+    }
+
+    protected abstract Class<? extends Processor> getProcessorClass();
+
+    protected void configureCosmosConnectionControllerService() throws Exception {
+        runner.removeProperty(AzureCosmosDBUtils.URI);
+        runner.removeProperty(AzureCosmosDBUtils.DB_ACCESS_KEY);
+
+        AzureCosmosDBClientService service = new AzureCosmosDBClientService();
+        runner.addControllerService("connService", service);
+
+        runner.setProperty(service, AzureCosmosDBUtils.URI,getComosURI());
+        runner.setProperty(service, AzureCosmosDBUtils.DB_ACCESS_KEY, getCosmosKey());
+        // now, after enabling and setting the service, it should be valid
+        runner.enableControllerService(service);
+        runner.setProperty(AbstractAzureCosmosDBProcessor.CONNECTION_SERVICE, service.getIdentifier());
+        runner.assertValid();
+    }
+
+    protected void clearTestData() throws Exception {
+        logger.info("clearing test data");
+        CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
+
+        CosmosPagedIterable<JsonNode> response = container.queryItems(
+            "select * from c order by c._ts", queryOptions, JsonNode.class );
+
+        response.forEach(data -> {
+            if (data.get(TEST_COSMOS_PARTITION_KEY_FIELD_NAME) != null){
+                PartitionKey pkey = new PartitionKey(data.get(TEST_COSMOS_PARTITION_KEY_FIELD_NAME).asText());
+                container.deleteItem(data.get("id").asText(), pkey, new CosmosItemRequestOptions());
+            } else {
+                container.deleteItem(data.get("id").asText(), PartitionKey.NONE, new CosmosItemRequestOptions());
+            }
+        });
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITPutAzureCosmosDBRecord.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITPutAzureCosmosDBRecord.java
new file mode 100644
index 0000000..242827c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITPutAzureCosmosDBRecord.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.logging.Logger;
+
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ITPutAzureCosmosDBRecord extends ITAbstractAzureCosmosDBDocument {
+    static Logger logger = Logger.getLogger(ITPutAzureCosmosDBRecord.class.getName());
+
+    @Override
+    protected Class<? extends Processor> getProcessorClass() {
+        return PutAzureCosmosDBRecord.class;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        resetTestCosmosConnection();
+    }
+
+    @After
+    public void cleanupTestCase() {
+        try{
+            clearTestData();
+            closeClient();
+        } catch(Exception e) {
+
+        }
+    }
+    private List<JsonNode> getDataFromTestDB() {
+        logger.info("getDataFromTestDB for test result validation");
+        CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
+        List<JsonNode> results = new ArrayList<>();
+
+        CosmosPagedIterable<JsonNode> response = container.queryItems(
+            "select * from c order by c._ts", queryOptions, JsonNode.class );
+
+        response.forEach(data -> {
+            results.add(data);
+        });
+        return results;
+    }
+
+    private MockRecordParser recordReader;
+
+    private void setupRecordReader() throws InitializationException {
+        recordReader = new MockRecordParser();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.setProperty(PutAzureCosmosDBRecord.RECORD_READER_FACTORY, "reader");
+    }
+
+    @Test
+    public void testOnTriggerWithNestedRecords() throws InitializationException {
+        setupRecordReader();
+        recordReader.addSchemaField("id", RecordFieldType.STRING);
+        recordReader.addSchemaField(TEST_COSMOS_PARTITION_KEY_FIELD_NAME, RecordFieldType.STRING);
+
+        final List<RecordField> personFields = new ArrayList<>();
+        final RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType());
+        final RecordField ageField = new RecordField("age", RecordFieldType.INT.getDataType());
+        final RecordField sportField = new RecordField("sport", RecordFieldType.STRING.getDataType());
+        personFields.add(nameField);
+        personFields.add(ageField);
+        personFields.add(sportField);
+        final RecordSchema personSchema = new SimpleRecordSchema(personFields);
+        recordReader.addSchemaField("person", RecordFieldType.RECORD);
+
+        recordReader.addRecord("1", "A", new MapRecord(personSchema, new HashMap<String,Object>() {
+            private static final long serialVersionUID = -3185956498135742190L;
+            {
+                put("name", "John Doe");
+                put("age", 48);
+                put("sport", "Soccer");
+            }
+        }));
+        recordReader.addRecord("2", "B", new MapRecord(personSchema, new HashMap<String,Object>() {
+            private static final long serialVersionUID = 1L;
+            {
+                put("name", "Jane Doe");
+                put("age", 47);
+                put("sport", "Tennis");
+            }
+        }));
+        recordReader.addRecord("3", "A", new MapRecord(personSchema, new HashMap<String,Object>() {
+            private static final long serialVersionUID = -1329194249439570573L;
+            {
+                put("name", "Sally Doe");
+                put("age", 47);
+                put("sport", "Curling");
+            }
+        }));
+        recordReader.addRecord("4", "C", new MapRecord(personSchema, new HashMap<String,Object>() {
+            private static final long serialVersionUID = -1329194249439570574L;
+            {
+                put("name", "Jimmy Doe");
+                put("age", 14);
+                put("sport", null);
+            }
+        }));
+
+        runner.enqueue("");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutAzureCosmosDBRecord.REL_SUCCESS, 1);
+        assertEquals(4, getDataFromTestDB().size());
+    }
+
+    @Test
+    public void testOnTriggerWithFlatRecords() throws InitializationException {
+        setupRecordReader();
+        recordReader.addSchemaField("id", RecordFieldType.STRING);
+        recordReader.addSchemaField(TEST_COSMOS_PARTITION_KEY_FIELD_NAME, RecordFieldType.STRING);
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+        recordReader.addRecord("1", "A", "John Doe", 48, "Soccer");
+        recordReader.addRecord("2", "B","Jane Doe", 47, "Tennis");
+        recordReader.addRecord("3", "B", "Sally Doe", 47, "Curling");
+        recordReader.addRecord("4", "A", "Jimmy Doe", 14, null);
+        recordReader.addRecord("5", "C","Pizza Doe", 14, null);
+
+        runner.enqueue("");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutAzureCosmosDBRecord.REL_SUCCESS, 1);
+        assertEquals(5, getDataFromTestDB().size());
+    }
+
+
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/MockTestBase.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/MockTestBase.java
new file mode 100644
index 0000000..ca1414b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/MockTestBase.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.Random;
+
+import com.azure.cosmos.ConsistencyLevel;
+import com.azure.cosmos.CosmosClient;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService;
+import org.apache.nifi.util.TestRunner;
+
+public class MockTestBase {
+
+    protected static final String MOCK_DB_NAME = "MOCK_DB_NAME";
+    protected static final String MOCK_CONTAINER_ID = "MOCK_CONTAINER_ID";
+    protected static final String MOCK_URI = "MOCK_URI";
+    protected static final String MOCK_DB_ACCESS_KEY = "MOCK_DB_ACCESS_KEY";
+    public static final String MOCK_QUERY = "select * from c";
+
+    public static final String MOCK_PARTITION_FIELD_NAME = "category";
+    protected TestRunner testRunner;
+
+    protected void setBasicMockProperties(boolean withConnectionService) throws InitializationException {
+        if (testRunner != null) {
+            testRunner.setProperty(AbstractAzureCosmosDBProcessor.DATABASE_NAME, MOCK_DB_NAME);
+            testRunner.setProperty(AbstractAzureCosmosDBProcessor.CONTAINER_ID,MOCK_CONTAINER_ID);
+            testRunner.setProperty(AbstractAzureCosmosDBProcessor.PARTITION_KEY,MOCK_PARTITION_FIELD_NAME);
+            if (withConnectionService) {
+                // setup connnection controller service
+                AzureCosmosDBClientService service = new MockConnectionService();
+                testRunner.addControllerService("connService", service);
+                testRunner.setProperty(service, AzureCosmosDBUtils.URI, MOCK_URI);
+                testRunner.setProperty(service, AzureCosmosDBUtils.DB_ACCESS_KEY, MOCK_DB_ACCESS_KEY);
+
+                // now, after enabling and setting the service, it should be valid
+                testRunner.enableControllerService(service);
+                testRunner.setProperty(AbstractAzureCosmosDBProcessor.CONNECTION_SERVICE, "connService");
+            }
+        }
+    }
+
+    private static Random random = new Random();
+    public static int getRandomInt(int min, int max){
+        return random.nextInt((max-min)+1) + min;
+    }
+
+    private class MockConnectionService extends AzureCosmosDBClientService {
+        @Override
+        protected void createCosmosClient(final String uri, final String accessKey, final ConsistencyLevel clevel){
+            // mock cosmos client
+            this.setCosmosClient(mock(CosmosClient.class));
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecordTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecordTest.java
new file mode 100644
index 0000000..4a6b8a8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecordTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import com.azure.cosmos.ConsistencyLevel;
+import com.azure.cosmos.CosmosClient;
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.CosmosException;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockSchemaRegistry;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import net.minidev.json.JSONObject;
+public class PutAzureCosmosDBRecordTest extends MockTestBase {
+
+    private MockPutAzureCosmosDBRecord processor;
+    private MockRecordParser recordReader;
+
+    private void setupRecordReader() throws InitializationException {
+        recordReader = new MockRecordParser();
+        if (testRunner != null) {
+            testRunner.addControllerService("reader", recordReader);
+            testRunner.enableControllerService(recordReader);
+            testRunner.setProperty(PutAzureCosmosDBRecord.RECORD_READER_FACTORY, "reader");
+        }
+
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        processor = new MockPutAzureCosmosDBRecord();
+        testRunner = TestRunners.newTestRunner(processor);
+        testRunner.setIncomingConnection(false);
+        testRunner.setNonLoopConnection(false);
+    }
+
+    @Test
+    public void testPutCosmosRecordProcessorConfigValidity() throws Exception {
+        setBasicMockProperties(false);
+        testRunner.setProperty(AzureCosmosDBUtils.URI, MOCK_URI);
+        testRunner.assertNotValid();
+        testRunner.setProperty(AzureCosmosDBUtils.DB_ACCESS_KEY, MOCK_DB_ACCESS_KEY);
+
+        testRunner.assertNotValid();
+
+        setupRecordReader();
+        testRunner.assertValid();
+        processor.setCosmosClient(null);
+        processor.onScheduled(testRunner.getProcessContext());
+        assertNotNull(processor.getCosmosClient());
+    }
+
+    @Test
+    public void testPutCosmosRecordProcessorConfigValidityWithConnectionService() throws Exception {
+        setBasicMockProperties(true);
+        testRunner.assertNotValid();
+        // setup recordReader
+        setupRecordReader();
+        testRunner.assertValid();
+        processor.setCosmosClient(null);
+        processor.onScheduled(testRunner.getProcessContext());
+        assertNotNull(processor.getCosmosClient());
+    }
+
+    @Test
+    public void testOnTriggerWithFlatRecords() throws Exception {
+        setupRecordReader();
+        prepareMockTest();
+        recordReader.addSchemaField("id", RecordFieldType.STRING);
+        recordReader.addSchemaField(MOCK_PARTITION_FIELD_NAME, RecordFieldType.STRING);
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+        recordReader.addRecord("1", "A", "John Doe", 48, "Soccer");
+        recordReader.addRecord("2", "B","Jane Doe", 47, "Tennis");
+        recordReader.addRecord("3", "B", "Sally Doe", 47, "Curling");
+        recordReader.addRecord("4", "A", "Jimmy Doe", 14, null);
+        recordReader.addRecord("5", "C","Pizza Doe", 14, null);
+
+        testRunner.enqueue("");
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutAzureCosmosDBRecord.REL_SUCCESS, 1);
+        assertEquals(5, processor.getTestResults().size());
+    }
+
+    @Test
+    public void testOnTriggerWithNestedRecords() throws Exception {
+        setupRecordReader();
+        prepareMockTest();
+        recordReader.addSchemaField("id", RecordFieldType.STRING);
+        recordReader.addSchemaField(MOCK_PARTITION_FIELD_NAME, RecordFieldType.STRING);
+
+        final List<RecordField> personFields = new ArrayList<>();
+        final RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType());
+        final RecordField ageField = new RecordField("age", RecordFieldType.INT.getDataType());
+        final RecordField sportField = new RecordField("sport", RecordFieldType.STRING.getDataType());
+        personFields.add(nameField);
+        personFields.add(ageField);
+        personFields.add(sportField);
+        final RecordSchema personSchema = new SimpleRecordSchema(personFields);
+        recordReader.addSchemaField("person", RecordFieldType.RECORD);
+
+        recordReader.addRecord("1", "A", new MapRecord(personSchema, new HashMap<String,Object>() {
+            private static final long serialVersionUID = -3185956498135742190L;
+            {
+                put("name", "John Doe");
+                put("age", 48);
+                put("sport", "Soccer");
+            }
+        }));
+        recordReader.addRecord("2", "B", new MapRecord(personSchema, new HashMap<String,Object>() {
+            private static final long serialVersionUID = 1L;
+            {
+                put("name", "Jane Doe");
+                put("age", 47);
+                put("sport", "Tennis");
+            }
+        }));
+        recordReader.addRecord("3", "A", new MapRecord(personSchema, new HashMap<String,Object>() {
+            private static final long serialVersionUID = -1329194249439570573L;
+            {
+                put("name", "Sally Doe");
+                put("age", 47);
+                put("sport", "Curling");
+            }
+        }));
+        recordReader.addRecord("4", "C", new MapRecord(personSchema, new HashMap<String,Object>() {
+            private static final long serialVersionUID = -1329194249439570574L;
+            {
+                put("name", "Jimmy Doe");
+                put("age", 14);
+                put("sport", null);
+            }
+        }));
+        testRunner.enqueue("");
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutAzureCosmosDBRecord.REL_SUCCESS, 1);
+        assertEquals(4, processor.getTestResults().size());
+    }
+
+    @Test
+    public void testArrayConversion() throws Exception {
+        Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();
+        // schema creation for test
+        JsonObject schemaDef = new JsonObject();
+        schemaDef.addProperty("type", "record");
+        schemaDef.addProperty("name", "Test");
+        JsonArray schemaArray = new JsonArray();
+        JsonObject f1 = new JsonObject();
+        f1.addProperty("type", "string");
+        f1.addProperty("name", "id");
+        schemaArray.add(f1);
+        JsonObject f2 = new JsonObject();
+        f2.addProperty("type", "string");
+        f2.addProperty("name", "name");
+        schemaArray.add(f2);
+        JsonObject f3 = new JsonObject();
+        f3.addProperty("type", "string");
+        f3.addProperty("name", "sport");
+        schemaArray.add(f3);
+        JsonObject arrayDef = new JsonObject();
+        arrayDef.addProperty("type", "array");
+        arrayDef.addProperty("items", "string");
+        JsonObject f4 = new JsonObject();
+        f4.add("type", arrayDef);
+        f4.addProperty("name", "arrayTest");
+        schemaArray.add(f4);
+        schemaDef.add("fields", schemaArray);
+
+        // test data generation
+        JsonObject testData = new JsonObject();
+        testData.addProperty("id", UUID.randomUUID().toString());
+        testData.addProperty("name", "John Doe");
+        testData.addProperty("sport", "Soccer");
+        JsonArray jarray = new JsonArray();
+        jarray.add("a");
+        jarray.add("b");
+        jarray.add("c");
+        testData.add("arrayTest", jarray);
+
+        // setup registry and reader
+        MockSchemaRegistry registry = new MockSchemaRegistry();
+        RecordSchema rschema = AvroTypeUtil.createSchema(new Schema.Parser().parse(gson.toJson(schemaDef)));
+        registry.addSchema("test", rschema);
+        JsonTreeReader reader = new JsonTreeReader();
+        testRunner.addControllerService("registry", registry);
+        testRunner.addControllerService("reader", reader);
+        testRunner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
+        testRunner.setProperty(PutAzureCosmosDBRecord.RECORD_READER_FACTORY, "reader");
+        testRunner.enableControllerService(registry);
+        testRunner.enableControllerService(reader);
+        prepareMockTest();
+        // override partiton key for this test case
+        testRunner.setProperty(PutAzureCosmosDBRecord.PARTITION_KEY, "sport");
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put("schema.name", "test");
+
+        testRunner.enqueue(gson.toJson(testData), attrs);
+        testRunner.run();
+
+        testRunner.assertTransferCount(PutAzureCosmosDBRecord.REL_FAILURE, 0);
+        testRunner.assertTransferCount(PutAzureCosmosDBRecord.REL_SUCCESS, 1);
+        List<Map<String, Object>> backendData = processor.getTestResults();
+        assertEquals(1, backendData.size());
+        //validate array data
+        JSONObject arrayTestResult = new JSONObject();
+        arrayTestResult.putAll(backendData.get(0));
+        Object[] check  = (Object []) arrayTestResult.get("arrayTest");
+        assertArrayEquals(new Object[]{"a", "b", "c"}, check);
+    }
+    private void prepareMockTest() throws Exception {
+        // this setup connection service and basic mock properties
+        setBasicMockProperties(true);
+    }
+}
+
+class MockPutAzureCosmosDBRecord extends PutAzureCosmosDBRecord {
+
+    static CosmosClient mockClient = mock(CosmosClient.class);
+    static CosmosContainer mockContainer = mock(CosmosContainer.class);
+    private List<Map<String, Object>> mockBackend = new ArrayList<>();
+
+    @Override
+    protected void createCosmosClient(final String uri, final String accessKey, final ConsistencyLevel clevel) {
+        this.setCosmosClient(mockClient);
+    }
+    @Override
+    protected void getCosmosDocumentContainer(final ProcessContext context) throws CosmosException {
+        this.setContainer(mockContainer);
+    }
+
+    @Override
+    protected void bulkInsert(List<Map<String, Object>> records ) throws CosmosException{
+        this.mockBackend.addAll(records);
+    }
+
+    public List<Map<String, Object>> getTestResults() {
+        return mockBackend;
+    }
+
+
+    public CosmosContainer getMockConainer() {
+        return mockContainer;
+    }
+
+
+
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/cosmos/document/TestAzureCosmosDBClientService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/cosmos/document/TestAzureCosmosDBClientService.java
new file mode 100644
index 0000000..64e7cce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/cosmos/document/TestAzureCosmosDBClientService.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.cosmos.document;
+
+import org.apache.nifi.processors.azure.cosmos.document.AzureCosmosDBUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAzureCosmosDBClientService {
+
+    private static final String MOCK_URI = "https://mockURI:443/";
+    private static final String DB_ACCESS_KEY = "mockDB_ACCESS_KEY";
+
+    private TestRunner runner;
+    private AzureCosmosDBClientService service;
+
+    @Before
+    public void setUp() throws InitializationException {
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        service = new AzureCosmosDBClientService();
+        runner.addControllerService("connService", service);
+    }
+
+    @Test
+    public void testValidWithURIandDBAccessKey() {
+        configureURI();
+        configureDBAccessKey();
+
+        runner.assertValid(service);
+    }
+
+    @Test
+    public void testNotValidBecauseURIMissing() {
+        configureDBAccessKey();
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    public void testNotValidBecauseDBAccessKeyMissing() {
+        configureURI();
+
+        runner.assertNotValid(service);
+    }
+
+    private void configureURI() {
+        runner.setProperty(service, AzureCosmosDBUtils.URI, MOCK_URI);
+    }
+
+    private void configureDBAccessKey() {
+        runner.setProperty(service, AzureCosmosDBUtils.DB_ACCESS_KEY, DB_ACCESS_KEY);
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml
index ff6af32..57dacf5 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml
@@ -27,11 +27,38 @@
         <dependency>
             <groupId>com.microsoft.azure</groupId>
             <artifactId>azure-storage</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-core</artifactId>
             <version>${azure.core.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.dataformat</groupId>
+                    <artifactId>jackson-dataformat-xml</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-cosmos</artifactId>
+            <version>${azure-cosmos.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.azure</groupId>
+                    <artifactId>azure-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <!-- overriding jackson-core in azure-storage -->
         <dependency>
@@ -40,6 +67,16 @@
             <version>${jackson.version}</version>
         </dependency>
         <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-xml</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>27.0.1-jre</version>
+            </dependency>
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-api</artifactId>
         </dependency>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/cosmos/AzureCosmosDBConnectionService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/cosmos/AzureCosmosDBConnectionService.java
new file mode 100644
index 0000000..387e1f2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/cosmos/AzureCosmosDBConnectionService.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.cosmos;
+
+import com.azure.cosmos.CosmosClient;
+
+import org.apache.nifi.controller.ControllerService;
+
+public interface AzureCosmosDBConnectionService extends ControllerService {
+
+    String getURI();
+
+    String getAccessKey();
+
+    String getConsistencyLevel();
+
+    CosmosClient getCosmosClient();
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
index 86897e5..e1bee31 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
@@ -27,8 +27,9 @@
 
     <properties>
         <azure-storage.version>8.4.0</azure-storage.version>
-        <azure.core.version>1.5.0</azure.core.version>
+        <azure.core.version>1.6.0</azure.core.version>
         <jackson.version>2.10.3</jackson.version>
+        <azure-cosmos.version>4.2.0</azure-cosmos.version>
     </properties>
 
     <modules>