You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jf...@apache.org on 2020/11/03 08:01:06 UTC
[nifi] branch main updated: NIFI-7406 Added PutAzureCosmosRecord
processor for Azure Cosmos DB
This is an automated email from the ASF dual-hosted git repository.
jfrazee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 80f49eb NIFI-7406 Added PutAzureCosmosRecord processor for Azure Cosmos DB
80f49eb is described below
commit 80f49eb7bdcd761cdf42631d4d9378c4004fb4e7
Author: sjyang18 <il...@hotmail.com>
AuthorDate: Mon May 4 22:58:36 2020 +0000
NIFI-7406 Added PutAzureCosmosRecord processor for Azure Cosmos DB
This closes #4253
Signed-off-by: Joey Frazee <jf...@apache.org>
---
.../nifi-azure-processors/pom.xml | 102 +++++++
.../document/AbstractAzureCosmosDBProcessor.java | 304 +++++++++++++++++++++
.../azure/cosmos/document/AzureCosmosDBUtils.java | 63 +++++
.../cosmos/document/PutAzureCosmosDBRecord.java | 221 +++++++++++++++
.../document/AzureCosmosDBClientService.java | 163 +++++++++++
.../org.apache.nifi.controller.ControllerService | 3 +-
.../services/org.apache.nifi.processor.Processor | 3 +-
.../document/ITAbstractAzureCosmosDBDocument.java | 216 +++++++++++++++
.../cosmos/document/ITPutAzureCosmosDBRecord.java | 165 +++++++++++
.../azure/cosmos/document/MockTestBase.java | 74 +++++
.../document/PutAzureCosmosDBRecordTest.java | 294 ++++++++++++++++++++
.../document/TestAzureCosmosDBClientService.java | 72 +++++
.../nifi-azure-services-api/pom.xml | 37 +++
.../cosmos/AzureCosmosDBConnectionService.java | 33 +++
nifi-nar-bundles/nifi-azure-bundle/pom.xml | 3 +-
15 files changed, 1750 insertions(+), 3 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 253a43d..58c0ccc 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -60,11 +60,23 @@
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>${azure.core.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-xml</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.0.6</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
@@ -75,15 +87,74 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-eph</artifactId>
<version>${azure-eventhubs-eph.version}</version>
+ <exclusions>
+ <!--depdendency resolution with com.azure sdk -->
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-cosmos</artifactId>
+ <version>${azure-cosmos.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-file-datalake</artifactId>
<version>12.2.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- overriding jackson-core in azure-storage -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>27.0.1-jre</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-xml</artifactId>
+ <version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
@@ -99,6 +170,37 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-avro-record-utils</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-json-utils</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-services</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-inline</artifactId>
+ <version>3.3.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>1.13.0-SNAPSHOT</version>
</dependency>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AbstractAzureCosmosDBProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AbstractAzureCosmosDBProcessor.java
new file mode 100644
index 0000000..b92098a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AbstractAzureCosmosDBProcessor.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import com.azure.cosmos.ConsistencyLevel;
+import com.azure.cosmos.CosmosClient;
+import com.azure.cosmos.CosmosClientBuilder;
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.CosmosDatabase;
+import com.azure.cosmos.CosmosException;
+import com.azure.cosmos.models.CosmosContainerProperties;
+import com.azure.cosmos.models.CosmosContainerResponse;
+import com.azure.cosmos.models.CosmosDatabaseResponse;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.azure.cosmos.AzureCosmosDBConnectionService;
+
+public abstract class AbstractAzureCosmosDBProcessor extends AbstractProcessor {
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles that are written to Cosmos DB are routed to this relationship")
+ .build();
+
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("All FlowFiles that cannot be written to Cosmos DB are routed to this relationship")
+ .build();
+
+ static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("All input FlowFiles that are part of a successful are routed to this relationship")
+ .build();
+
+ static final PropertyDescriptor CONNECTION_SERVICE = new PropertyDescriptor.Builder()
+ .name("azure-cosmos-db-connection-service")
+ .displayName("Cosmos DB Connection Service")
+ .description("If configured, the controller service used to obtain the connection string and access key")
+ .required(false)
+ .identifiesControllerService(AzureCosmosDBConnectionService.class)
+ .build();
+
+ static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
+ .name("azure-cosmos-db-name")
+ .displayName("Cosmos DB Name")
+ .description("The database name or id. This is used as the namespace for document collections or containers")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor CONTAINER_ID = new PropertyDescriptor.Builder()
+ .name("azure-cosmos-db-container-id")
+ .displayName("Cosmos DB Container ID")
+ .description("The unique identifier for the container")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor PARTITION_KEY = new PropertyDescriptor.Builder()
+ .name("azure-cosmos-db-partition-key")
+ .displayName("Cosmos DB Partition Key")
+ .description("The partition key used to distribute data among servers")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
+ .name("charactor-set")
+ .displayName("Charactor Set")
+ .description("The Character Set in which the data is encoded")
+ .required(false)
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .defaultValue("UTF-8")
+ .build();
+
+ static final List<PropertyDescriptor> descriptors;
+
+ static {
+ List<PropertyDescriptor> _temp = new ArrayList<>();
+ _temp.add(CONNECTION_SERVICE);
+ _temp.add(AzureCosmosDBUtils.URI);
+ _temp.add(AzureCosmosDBUtils.DB_ACCESS_KEY);
+ _temp.add(AzureCosmosDBUtils.CONSISTENCY);
+ _temp.add(DATABASE_NAME);
+ _temp.add(CONTAINER_ID);
+ _temp.add(PARTITION_KEY);
+ descriptors = Collections.unmodifiableList(_temp);
+ }
+
+ private CosmosClient cosmosClient;
+ private CosmosContainer container;
+ private AzureCosmosDBConnectionService connectionService;
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) throws CosmosException {
+ final ComponentLog logger = getLogger();
+
+ if (context.getProperty(CONNECTION_SERVICE).isSet()) {
+ this.connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(AzureCosmosDBConnectionService.class);
+ this.cosmosClient = this.connectionService.getCosmosClient();
+ } else {
+ final String uri = context.getProperty(AzureCosmosDBUtils.URI).getValue();
+ final String accessKey = context.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).getValue();
+ final String selectedConsistency = context.getProperty(AzureCosmosDBUtils.CONSISTENCY).getValue();
+ final ConsistencyLevel clevel;
+ switch (selectedConsistency) {
+ case AzureCosmosDBUtils.CONSISTENCY_STRONG:
+ clevel = ConsistencyLevel.STRONG;
+ break;
+ case AzureCosmosDBUtils.CONSISTENCY_CONSISTENT_PREFIX:
+ clevel = ConsistencyLevel.CONSISTENT_PREFIX;
+ break;
+ case AzureCosmosDBUtils.CONSISTENCY_SESSION:
+ clevel = ConsistencyLevel.SESSION;
+ break;
+ case AzureCosmosDBUtils.CONSISTENCY_BOUNDED_STALENESS:
+ clevel = ConsistencyLevel.BOUNDED_STALENESS;
+ break;
+ case AzureCosmosDBUtils.CONSISTENCY_EVENTUAL:
+ clevel = ConsistencyLevel.EVENTUAL;
+ break;
+ default:
+ clevel = ConsistencyLevel.SESSION;
+ }
+ if (cosmosClient != null) {
+ onStopped();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Creating CosmosClient");
+ }
+ createCosmosClient(uri, accessKey, clevel);
+ }
+ getCosmosDocumentContainer(context);
+ doPostActionOnSchedule(context);
+ }
+
+ protected void createCosmosClient(final String uri, final String accessKey, final ConsistencyLevel clevel) {
+ this.cosmosClient = new CosmosClientBuilder()
+ .endpoint(uri)
+ .key(accessKey)
+ .consistencyLevel(clevel)
+ .buildClient();
+ }
+
+ protected abstract void doPostActionOnSchedule(final ProcessContext context);
+
+ protected void getCosmosDocumentContainer(final ProcessContext context) throws CosmosException {
+ final String databaseName = context.getProperty(DATABASE_NAME).getValue();
+ final String containerID = context.getProperty(CONTAINER_ID).getValue();
+ final String partitionKey = context.getProperty(PARTITION_KEY).getValue();
+
+ final CosmosDatabaseResponse databaseResponse = this.cosmosClient.createDatabaseIfNotExists(databaseName);
+ final CosmosDatabase database = this.cosmosClient.getDatabase(databaseResponse.getProperties().getId());
+
+ final CosmosContainerProperties containerProperties =
+ new CosmosContainerProperties(containerID, "/"+partitionKey);
+
+ // Create container by default if Not exists.
+ final CosmosContainerResponse containerResponse = database.createContainerIfNotExists(containerProperties);
+ this.container = database.getContainer(containerResponse.getProperties().getId());
+ }
+
+ @OnStopped
+ public final void onStopped() {
+ final ComponentLog logger = getLogger();
+ if (connectionService == null && cosmosClient != null) {
+ // close client only when cosmoclient is created in Processor.
+ if(logger.isDebugEnabled()) {
+ logger.debug("Closing CosmosClient");
+ }
+ try{
+ this.container = null;
+ this.cosmosClient.close();
+ }catch(CosmosException e) {
+ logger.error("Error closing Cosmos DB client due to {}", new Object[] { e.getMessage() }, e);
+ } finally {
+ this.cosmosClient = null;
+ }
+ }
+ }
+
+ protected String getURI(final ProcessContext context) {
+ if (this.connectionService != null) {
+ return this.connectionService.getURI();
+ } else {
+ return context.getProperty(AzureCosmosDBUtils.URI).getValue();
+ }
+ }
+
+ protected String getAccessKey(final ProcessContext context) {
+ if (this.connectionService != null) {
+ return this.connectionService.getAccessKey();
+ } else {
+ return context.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).getValue();
+ }
+ }
+
+ protected String getConsistencyLevel(final ProcessContext context) {
+ if (this.connectionService != null) {
+ return this.connectionService.getConsistencyLevel();
+ } else {
+ return context.getProperty(AzureCosmosDBUtils.CONSISTENCY).getValue();
+ }
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext context) {
+ List<ValidationResult> retVal = new ArrayList<>();
+
+ boolean connectionServiceIsSet = context.getProperty(CONNECTION_SERVICE).isSet();
+ boolean uriIsSet = context.getProperty(AzureCosmosDBUtils.URI).isSet();
+ boolean accessKeyIsSet = context.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).isSet();
+ boolean databaseIsSet = context.getProperty(DATABASE_NAME).isSet();
+ boolean collectionIsSet = context.getProperty(CONTAINER_ID).isSet();
+ boolean partitionIsSet = context.getProperty(PARTITION_KEY).isSet();
+
+ if (connectionServiceIsSet && (uriIsSet || accessKeyIsSet) ) {
+ // If connection Service is set, None of the Processor variables URI and accessKey
+ // should be set.
+ final String msg = String.format(
+ "If connection service is used for DB connection, none of %s and %s should be set",
+ AzureCosmosDBUtils.URI.getDisplayName(),
+ AzureCosmosDBUtils.DB_ACCESS_KEY.getDisplayName()
+ );
+ retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+ } else if (!connectionServiceIsSet && (!uriIsSet || !accessKeyIsSet)) {
+ // If connection Service is not set, Both of the Processor variable URI and accessKey
+ // should be set.
+ final String msg = String.format(
+ "If connection service is not used for DB connection, both %s and %s should be set",
+ AzureCosmosDBUtils.URI.getDisplayName(),
+ AzureCosmosDBUtils.DB_ACCESS_KEY.getDisplayName()
+ );
+ retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+ }
+ if (!databaseIsSet) {
+ final String msg = AbstractAzureCosmosDBProcessor.DATABASE_NAME.getDisplayName() + " must be set.";
+ retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+ }
+ if (!collectionIsSet) {
+ final String msg = AbstractAzureCosmosDBProcessor.CONTAINER_ID.getDisplayName() + " must be set.";
+ retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+ }
+ if (!partitionIsSet) {
+ final String msg = AbstractAzureCosmosDBProcessor.PARTITION_KEY.getDisplayName() + " must be set.";
+ retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+ }
+ return retVal;
+ }
+
+ protected CosmosClient getCosmosClient() {
+ return cosmosClient;
+ }
+
+ protected void setCosmosClient(CosmosClient cosmosClient) {
+ this.cosmosClient = cosmosClient;
+ }
+
+ protected CosmosContainer getContainer() {
+ return container;
+ }
+
+ protected void setContainer(CosmosContainer container) {
+ this.container = container;
+ }
+
+ protected AzureCosmosDBConnectionService getConnectionService() {
+ return connectionService;
+ }
+
+ protected void setConnectionService(AzureCosmosDBConnectionService connectionService) {
+ this.connectionService = connectionService;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AzureCosmosDBUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AzureCosmosDBUtils.java
new file mode 100644
index 0000000..3b4fe22
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AzureCosmosDBUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+public final class AzureCosmosDBUtils {
+ public static final String CONSISTENCY_STRONG = "STRONG";
+ public static final String CONSISTENCY_BOUNDED_STALENESS= "BOUNDED_STALENESS";
+ public static final String CONSISTENCY_SESSION = "SESSION";
+ public static final String CONSISTENCY_CONSISTENT_PREFIX = "CONSISTENT_PREFIX";
+ public static final String CONSISTENCY_EVENTUAL = "EVENTUAL";
+
+ public static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
+ .name("azure-cosmos-db-uri")
+ .displayName("Cosmos DB URI")
+ .description("Cosmos DB URI, typically in the form of https://{databaseaccount}.documents.azure.com:443/"
+ + " Note this host URL is for Cosmos DB with Core SQL API"
+ + " from Azure Portal (Overview->URI)")
+ .required(false)
+ .addValidator(StandardValidators.URI_VALIDATOR)
+ .sensitive(true)
+ .build();
+
+ public static final PropertyDescriptor DB_ACCESS_KEY = new PropertyDescriptor.Builder()
+ .name("azure-cosmos-db-key")
+ .displayName("Cosmos DB Access Key")
+ .description("Cosmos DB Access Key from Azure Portal (Settings->Keys). "
+ + "Choose a read-write key to enable database or container creation at run time")
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .sensitive(true)
+ .build();
+
+ public static final PropertyDescriptor CONSISTENCY = new PropertyDescriptor.Builder()
+ .name("azure-cosmos-db-consistency-level")
+ .displayName("Cosmos DB Consistency Level")
+ .description("Choose from five consistency levels on the consistency spectrum. "
+ + "Refer to Cosmos DB documentation for their differences")
+ .required(false)
+ .defaultValue(CONSISTENCY_SESSION)
+ .allowableValues(CONSISTENCY_STRONG, CONSISTENCY_BOUNDED_STALENESS, CONSISTENCY_SESSION,
+ CONSISTENCY_CONSISTENT_PREFIX, CONSISTENCY_EVENTUAL)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
new file mode 100644
index 0000000..cfbb221
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.CosmosException;
+import com.azure.cosmos.implementation.ConflictException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+@EventDriven
+@Tags({ "azure", "cosmos", "insert", "record", "put" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("This processor is a record-aware processor for inserting data into Cosmos DB with Core SQL API. It uses a configured record reader and " +
+ "schema to read an incoming record set from the body of a Flowfile and then inserts those records into " +
+ "a configured Cosmos DB Container.")
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+public class PutAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
+
+ private String conflictHandlingStrategy;
+ static final AllowableValue IGNORE_CONFLICT = new AllowableValue("IGNORE", "Ignore", "Conflicting records will not be inserted, and FlowFile will not be routed to failure");
+ static final AllowableValue UPSERT_CONFLICT = new AllowableValue("UPSERT", "Upsert", "Conflicting records will be upserted, and FlowFile will not be routed to failure");
+
+ static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor INSERT_BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("insert-batch-size")
+ .displayName("Insert Batch Size")
+ .description("The number of records to group together for one single insert operation against Cosmos DB")
+ .defaultValue("20")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor CONFLICT_HANDLE_STRATEGY = new PropertyDescriptor.Builder()
+ .name("azure-cosmos-db-conflict-handling-strategy")
+ .displayName("Cosmos DB Conflict Handling Strategy")
+ .description("Choose whether to ignore or upsert when conflict error occurs during insertion")
+ .required(false)
+ .allowableValues(IGNORE_CONFLICT, UPSERT_CONFLICT)
+ .defaultValue(IGNORE_CONFLICT.getValue())
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+
+ private final static Set<Relationship> relationships;
+ private final static List<PropertyDescriptor> propertyDescriptors;
+
+ static {
+ List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.addAll(descriptors);
+ _propertyDescriptors.add(RECORD_READER_FACTORY);
+ _propertyDescriptors.add(INSERT_BATCH_SIZE);
+ _propertyDescriptors.add(CONFLICT_HANDLE_STRATEGY);
+ propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+ final Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ _relationships.add(REL_FAILURE);
+ relationships = Collections.unmodifiableSet(_relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ protected void bulkInsert(final List<Map<String, Object>> records) throws CosmosException{
+ // In the future, this method will be replaced by calling createItems API
+ // for example, this.container.createItems(records);
+ // currently, no createItems API available in Azure Cosmos Java SDK
+ final ComponentLog logger = getLogger();
+ final CosmosContainer container = getContainer();
+ for (Map<String, Object> record : records){
+ try {
+ container.createItem(record);
+ } catch (ConflictException e) {
+ // insert with unique id is expected. In case conflict occurs, use the selected strategy.
+ // By default, it will ignore.
+ if (conflictHandlingStrategy != null && conflictHandlingStrategy.equals(UPSERT_CONFLICT.getValue())){
+ container.upsertItem(record);
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Ignoring duplicate based on selected conflict resolution strategy");
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final ComponentLog logger = getLogger();
+ final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
+ .asControllerService(RecordReaderFactory.class);
+
+ final String partitionKeyField = context.getProperty(PARTITION_KEY).getValue();
+ List<Map<String, Object>> batch = new ArrayList<>();
+ int ceiling = context.getProperty(INSERT_BATCH_SIZE).asInteger();
+ boolean error = false;
+ try (final InputStream inStream = session.read(flowFile);
+ final RecordReader reader = recordParserFactory.createRecordReader(flowFile, inStream, getLogger())) {
+
+ RecordSchema schema = reader.getSchema();
+ Record record;
+
+ while ((record = reader.nextRecord()) != null) {
+ // Convert each Record to HashMap
+ Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(schema));
+ if(contentMap.containsKey("id")) {
+ final Object idObj = contentMap.get("id");
+ final String idStr = (idObj == null) ? "" : String.valueOf(idObj);
+ if (idObj == null || StringUtils.isBlank(idStr)) {
+ // dont put null to id
+ contentMap.put("id", UUID.randomUUID().toString());
+ } else {
+ contentMap.put("id", idStr);
+ }
+ } else {
+ contentMap.put("id", UUID.randomUUID().toString());
+ }
+ if (!contentMap.containsKey(partitionKeyField)) {
+ logger.error(String.format("PutAzureCosmoDBRecord failed with missing partitionKeyField (%s)", partitionKeyField));
+ error = true;
+ break;
+ }
+ batch.add(contentMap);
+ if (batch.size() == ceiling) {
+ bulkInsert(batch);
+ batch = new ArrayList<>();
+ }
+ }
+ if (!error && batch.size() > 0) {
+ bulkInsert(batch);
+ }
+ } catch (SchemaNotFoundException | MalformedRecordException | IOException | CosmosException e) {
+ logger.error("PutAzureCosmoDBRecord failed with error: {}", new Object[]{e.getMessage()}, e);
+ error = true;
+ } finally {
+ if (!error) {
+ session.getProvenanceReporter().send(flowFile, getURI(context));
+ session.transfer(flowFile, REL_SUCCESS);
+ } else {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+ session.commit();
+ }
+
+ @Override
+ protected void doPostActionOnSchedule(final ProcessContext context) {
+ conflictHandlingStrategy = context.getProperty(CONFLICT_HANDLE_STRATEGY).getValue();
+ if (conflictHandlingStrategy == null)
+ conflictHandlingStrategy = IGNORE_CONFLICT.getValue();
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/cosmos/document/AzureCosmosDBClientService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/cosmos/document/AzureCosmosDBClientService.java
new file mode 100644
index 0000000..c56abb0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/cosmos/document/AzureCosmosDBClientService.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.services.azure.cosmos.document;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import com.azure.cosmos.ConsistencyLevel;
+import com.azure.cosmos.CosmosClient;
+import com.azure.cosmos.CosmosClientBuilder;
+import com.azure.cosmos.CosmosException;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processors.azure.cosmos.document.AzureCosmosDBUtils;
+import org.apache.nifi.services.azure.cosmos.AzureCosmosDBConnectionService;
+import org.apache.nifi.util.StringUtils;
+
+@Tags({"azure", "cosmos", "document", "service"})
+@CapabilityDescription(
+ "Provides a controller service that configures a connection to Cosmos DB (Core SQL API) " +
+ " and provides access to that connection to other Cosmos DB-related components."
+)
+public class AzureCosmosDBClientService extends AbstractControllerService implements AzureCosmosDBConnectionService {
+ private String uri;
+ private String accessKey;
+ private String consistencyLevel;
+ private CosmosClient cosmosClient;
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ this.uri = context.getProperty(AzureCosmosDBUtils.URI).getValue();
+ this.accessKey = context.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).getValue();
+ final ConsistencyLevel clevel;
+ final String selectedConsistency = context.getProperty(AzureCosmosDBUtils.CONSISTENCY).getValue();
+
+ switch(selectedConsistency) {
+ case AzureCosmosDBUtils.CONSISTENCY_STRONG:
+ clevel = ConsistencyLevel.STRONG;
+ break;
+ case AzureCosmosDBUtils.CONSISTENCY_CONSISTENT_PREFIX:
+ clevel = ConsistencyLevel.CONSISTENT_PREFIX;
+ break;
+ case AzureCosmosDBUtils.CONSISTENCY_SESSION:
+ clevel = ConsistencyLevel.SESSION;
+ break;
+ case AzureCosmosDBUtils.CONSISTENCY_BOUNDED_STALENESS:
+ clevel = ConsistencyLevel.BOUNDED_STALENESS;
+ break;
+ case AzureCosmosDBUtils.CONSISTENCY_EVENTUAL:
+ clevel = ConsistencyLevel.EVENTUAL;
+ break;
+ default:
+ clevel = ConsistencyLevel.SESSION;
+ }
+
+ if (this.cosmosClient != null) {
+ onStopped();
+ }
+ consistencyLevel = clevel.toString();
+ createCosmosClient(uri, accessKey, clevel);
+ }
+
+
+ @OnStopped
+ public final void onStopped() {
+ if (this.cosmosClient != null) {
+ try {
+ cosmosClient.close();
+ } catch(CosmosException e) {
+ getLogger().error("Closing cosmosClient Failed: " + e.getMessage(), e);
+ } finally {
+ this.cosmosClient = null;
+ }
+ }
+ }
+
+ protected void createCosmosClient(final String uri, final String accessKey, final ConsistencyLevel clevel){
+ this.cosmosClient = new CosmosClientBuilder()
+ .endpoint(uri)
+ .key(accessKey)
+ .consistencyLevel(clevel)
+ .buildClient();
+ }
+
+ static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+ static {
+ descriptors.add(AzureCosmosDBUtils.URI);
+ descriptors.add(AzureCosmosDBUtils.DB_ACCESS_KEY);
+ descriptors.add(AzureCosmosDBUtils.CONSISTENCY);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ @Override
+ public String getURI() {
+ return this.uri;
+ }
+
+ @Override
+ public String getAccessKey() {
+ return this.accessKey;
+ }
+ @Override
+ public String getConsistencyLevel() {
+ return this.consistencyLevel;
+ }
+
+ @Override
+ public CosmosClient getCosmosClient() {
+ return this.cosmosClient;
+ }
+ public void setCosmosClient(CosmosClient client) {
+ this.cosmosClient = client;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>();
+
+ final String uri = validationContext.getProperty(AzureCosmosDBUtils.URI).getValue();
+ final String accessKey = validationContext.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).getValue();
+
+ if (StringUtils.isBlank(uri) || StringUtils.isBlank(accessKey)) {
+ results.add(new ValidationResult.Builder()
+ .subject("AzureStorageCredentialsControllerService")
+ .valid(false)
+ .explanation(
+ "either " + AzureCosmosDBUtils.URI.getDisplayName()
+ + " or " + AzureCosmosDBUtils.DB_ACCESS_KEY.getDisplayName() + " is required")
+ .build());
+ }
+ return results;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 4179771..5e63d4b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -14,4 +14,5 @@
# limitations under the License.
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup
-org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService
\ No newline at end of file
+org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService
+org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 015596e..6456dd0 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -24,4 +24,5 @@ org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage
org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage
org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage
org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage
-org.apache.nifi.processors.azure.storage.ListAzureDataLakeStorage
\ No newline at end of file
+org.apache.nifi.processors.azure.storage.ListAzureDataLakeStorage
+org.apache.nifi.processors.azure.cosmos.document.PutAzureCosmosDBRecord
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITAbstractAzureCosmosDBDocument.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITAbstractAzureCosmosDBDocument.java
new file mode 100644
index 0000000..06bc335
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITAbstractAzureCosmosDBDocument.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+import com.azure.cosmos.CosmosClient;
+import com.azure.cosmos.CosmosClientBuilder;
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.CosmosDatabase;
+import com.azure.cosmos.CosmosException;
+import com.azure.cosmos.models.CosmosContainerProperties;
+import com.azure.cosmos.models.CosmosContainerResponse;
+import com.azure.cosmos.models.CosmosDatabaseResponse;
+import com.azure.cosmos.models.CosmosItemRequestOptions;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.models.PartitionKey;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+public abstract class ITAbstractAzureCosmosDBDocument {
+ static Logger logger = Logger.getLogger(ITAbstractAzureCosmosDBDocument.class.getName());
+
+ private static final Properties CONFIG;
+
+ private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
+ protected static final String TEST_COSMOS_DB_NAME = "nifi-test-db";
+ protected static final String TEST_COSMOS_CONTAINER_NAME = "nifi-test-container";
+ protected static final String TEST_COSMOS_PARTITION_KEY_FIELD_NAME = "category";
+ protected static CosmosClient client;
+ protected static CosmosDatabase cdb;
+ protected static CosmosContainer container;
+
+ static {
+ final FileInputStream fis;
+ CONFIG = new Properties();
+ try {
+ fis = new FileInputStream(CREDENTIALS_FILE);
+ try {
+ CONFIG.load(fis);
+ } catch (IOException e) {
+ fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
+ } finally {
+ FileUtils.closeQuietly(fis);
+ }
+ } catch (FileNotFoundException e) {
+ fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
+ }
+ }
+
+ protected static String getComosURI() {
+ return CONFIG.getProperty("cosmosURI");
+ }
+
+ protected static String getCosmosKey() {
+ return CONFIG.getProperty("cosmosKey");
+ }
+
+ protected TestRunner runner;
+
+ @BeforeClass
+ public static void createTestDBContainerIfNeeded() throws CosmosException {
+ final String testDBURI = getComosURI();
+ final String testDBContainer = getCosmosKey();
+
+ client = new CosmosClientBuilder()
+ .endpoint(testDBURI)
+ .key(testDBContainer)
+ .buildClient();
+
+ CosmosDatabaseResponse databaseResponse = client.createDatabase(TEST_COSMOS_DB_NAME);
+ cdb = client.getDatabase(databaseResponse.getProperties().getId());
+ CosmosContainerProperties containerProperties =
+ new CosmosContainerProperties(TEST_COSMOS_CONTAINER_NAME, "/"+TEST_COSMOS_PARTITION_KEY_FIELD_NAME);
+ CosmosContainerResponse containerResponse = cdb.createContainer(containerProperties);
+ container = cdb.getContainer(containerResponse.getProperties().getId());
+ assertNotNull(container);
+ }
+
+ @AfterClass
+ public static void dropTestDBAndContainer() throws CosmosException {
+ resetTestCosmosConnection();
+ if (container != null) {
+ try {
+ container.delete();
+ } catch(CosmosException e) {
+ logger.info(e.getMessage());
+ } finally {
+ container = null;
+ }
+ }
+ if (cdb != null) {
+ try {
+ cdb.delete();
+ } catch(CosmosException e) {
+ logger.info(e.getMessage());
+ } finally {
+ cdb = null;
+ }
+ }
+ if (client != null){
+ try {
+ client.close();
+ } catch(CosmosException e) {
+ logger.info(e.getMessage());
+ } finally {
+ client = null;
+ }
+ }
+ }
+
+ @Before
+ public void setUpCosmosIT() {
+ final String testDBURI = getComosURI();
+ final String testDBContainer = getCosmosKey();
+ runner = TestRunners.newTestRunner(getProcessorClass());
+ runner.setProperty(AzureCosmosDBUtils.URI, testDBURI);
+ runner.setProperty(AzureCosmosDBUtils.DB_ACCESS_KEY, testDBContainer);
+ runner.setProperty(AbstractAzureCosmosDBProcessor.DATABASE_NAME, TEST_COSMOS_DB_NAME);
+ runner.setProperty(AbstractAzureCosmosDBProcessor.CONTAINER_ID, TEST_COSMOS_CONTAINER_NAME);
+ runner.setProperty(AbstractAzureCosmosDBProcessor.PARTITION_KEY, TEST_COSMOS_PARTITION_KEY_FIELD_NAME);
+ runner.setIncomingConnection(false);
+ runner.setNonLoopConnection(false);
+ }
+
+ protected static void closeClient() {
+ try {
+ client.close();
+ } catch(CosmosException e){
+ logger.info(e.getMessage());
+ } finally {
+ client =null;
+ cdb = null;
+ container = null;
+ }
+ }
+
+ protected static void resetTestCosmosConnection() {
+ if (client != null) {
+ closeClient();
+ }
+ final String testDBURI = getComosURI();
+ final String testDBContainer = getCosmosKey();
+
+ client = new CosmosClientBuilder()
+ .endpoint(testDBURI)
+ .key(testDBContainer)
+ .buildClient();
+ cdb = client.getDatabase(TEST_COSMOS_DB_NAME);
+ container = cdb.getContainer(TEST_COSMOS_CONTAINER_NAME);
+ }
+
+ protected abstract Class<? extends Processor> getProcessorClass();
+
+ protected void configureCosmosConnectionControllerService() throws Exception {
+ runner.removeProperty(AzureCosmosDBUtils.URI);
+ runner.removeProperty(AzureCosmosDBUtils.DB_ACCESS_KEY);
+
+ AzureCosmosDBClientService service = new AzureCosmosDBClientService();
+ runner.addControllerService("connService", service);
+
+ runner.setProperty(service, AzureCosmosDBUtils.URI,getComosURI());
+ runner.setProperty(service, AzureCosmosDBUtils.DB_ACCESS_KEY, getCosmosKey());
+ // now, after enabling and setting the service, it should be valid
+ runner.enableControllerService(service);
+ runner.setProperty(AbstractAzureCosmosDBProcessor.CONNECTION_SERVICE, service.getIdentifier());
+ runner.assertValid();
+ }
+
+ protected void clearTestData() throws Exception {
+ logger.info("clearing test data");
+ CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
+
+ CosmosPagedIterable<JsonNode> response = container.queryItems(
+ "select * from c order by c._ts", queryOptions, JsonNode.class );
+
+ response.forEach(data -> {
+ if (data.get(TEST_COSMOS_PARTITION_KEY_FIELD_NAME) != null){
+ PartitionKey pkey = new PartitionKey(data.get(TEST_COSMOS_PARTITION_KEY_FIELD_NAME).asText());
+ container.deleteItem(data.get("id").asText(), pkey, new CosmosItemRequestOptions());
+ } else {
+ container.deleteItem(data.get("id").asText(), PartitionKey.NONE, new CosmosItemRequestOptions());
+ }
+ });
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITPutAzureCosmosDBRecord.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITPutAzureCosmosDBRecord.java
new file mode 100644
index 0000000..242827c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITPutAzureCosmosDBRecord.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.logging.Logger;
+
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.util.CosmosPagedIterable;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ITPutAzureCosmosDBRecord extends ITAbstractAzureCosmosDBDocument {
+ static Logger logger = Logger.getLogger(ITPutAzureCosmosDBRecord.class.getName());
+
+ @Override
+ protected Class<? extends Processor> getProcessorClass() {
+ return PutAzureCosmosDBRecord.class;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ resetTestCosmosConnection();
+ }
+
+ @After
+ public void cleanupTestCase() {
+ try{
+ clearTestData();
+ closeClient();
+ } catch(Exception e) {
+
+ }
+ }
+ private List<JsonNode> getDataFromTestDB() {
+ logger.info("getDataFromTestDB for test result validation");
+ CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
+ List<JsonNode> results = new ArrayList<>();
+
+ CosmosPagedIterable<JsonNode> response = container.queryItems(
+ "select * from c order by c._ts", queryOptions, JsonNode.class );
+
+ response.forEach(data -> {
+ results.add(data);
+ });
+ return results;
+ }
+
+ private MockRecordParser recordReader;
+
+ private void setupRecordReader() throws InitializationException {
+ recordReader = new MockRecordParser();
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+ runner.setProperty(PutAzureCosmosDBRecord.RECORD_READER_FACTORY, "reader");
+ }
+
+ @Test
+ public void testOnTriggerWithNestedRecords() throws InitializationException {
+ setupRecordReader();
+ recordReader.addSchemaField("id", RecordFieldType.STRING);
+ recordReader.addSchemaField(TEST_COSMOS_PARTITION_KEY_FIELD_NAME, RecordFieldType.STRING);
+
+ final List<RecordField> personFields = new ArrayList<>();
+ final RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType());
+ final RecordField ageField = new RecordField("age", RecordFieldType.INT.getDataType());
+ final RecordField sportField = new RecordField("sport", RecordFieldType.STRING.getDataType());
+ personFields.add(nameField);
+ personFields.add(ageField);
+ personFields.add(sportField);
+ final RecordSchema personSchema = new SimpleRecordSchema(personFields);
+ recordReader.addSchemaField("person", RecordFieldType.RECORD);
+
+ recordReader.addRecord("1", "A", new MapRecord(personSchema, new HashMap<String,Object>() {
+ private static final long serialVersionUID = -3185956498135742190L;
+ {
+ put("name", "John Doe");
+ put("age", 48);
+ put("sport", "Soccer");
+ }
+ }));
+ recordReader.addRecord("2", "B", new MapRecord(personSchema, new HashMap<String,Object>() {
+ private static final long serialVersionUID = 1L;
+ {
+ put("name", "Jane Doe");
+ put("age", 47);
+ put("sport", "Tennis");
+ }
+ }));
+ recordReader.addRecord("3", "A", new MapRecord(personSchema, new HashMap<String,Object>() {
+ private static final long serialVersionUID = -1329194249439570573L;
+ {
+ put("name", "Sally Doe");
+ put("age", 47);
+ put("sport", "Curling");
+ }
+ }));
+ recordReader.addRecord("4", "C", new MapRecord(personSchema, new HashMap<String,Object>() {
+ private static final long serialVersionUID = -1329194249439570574L;
+ {
+ put("name", "Jimmy Doe");
+ put("age", 14);
+ put("sport", null);
+ }
+ }));
+
+ runner.enqueue("");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutAzureCosmosDBRecord.REL_SUCCESS, 1);
+ assertEquals(4, getDataFromTestDB().size());
+ }
+
+ @Test
+ public void testOnTriggerWithFlatRecords() throws InitializationException {
+ setupRecordReader();
+ recordReader.addSchemaField("id", RecordFieldType.STRING);
+ recordReader.addSchemaField(TEST_COSMOS_PARTITION_KEY_FIELD_NAME, RecordFieldType.STRING);
+ recordReader.addSchemaField("name", RecordFieldType.STRING);
+ recordReader.addSchemaField("age", RecordFieldType.INT);
+ recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+ recordReader.addRecord("1", "A", "John Doe", 48, "Soccer");
+ recordReader.addRecord("2", "B","Jane Doe", 47, "Tennis");
+ recordReader.addRecord("3", "B", "Sally Doe", 47, "Curling");
+ recordReader.addRecord("4", "A", "Jimmy Doe", 14, null);
+ recordReader.addRecord("5", "C","Pizza Doe", 14, null);
+
+ runner.enqueue("");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutAzureCosmosDBRecord.REL_SUCCESS, 1);
+ assertEquals(5, getDataFromTestDB().size());
+ }
+
+
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/MockTestBase.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/MockTestBase.java
new file mode 100644
index 0000000..ca1414b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/MockTestBase.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.Random;
+
+import com.azure.cosmos.ConsistencyLevel;
+import com.azure.cosmos.CosmosClient;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService;
+import org.apache.nifi.util.TestRunner;
+
+public class MockTestBase {
+
+ protected static final String MOCK_DB_NAME = "MOCK_DB_NAME";
+ protected static final String MOCK_CONTAINER_ID = "MOCK_CONTAINER_ID";
+ protected static final String MOCK_URI = "MOCK_URI";
+ protected static final String MOCK_DB_ACCESS_KEY = "MOCK_DB_ACCESS_KEY";
+ public static final String MOCK_QUERY = "select * from c";
+
+ public static final String MOCK_PARTITION_FIELD_NAME = "category";
+ protected TestRunner testRunner;
+
+ protected void setBasicMockProperties(boolean withConnectionService) throws InitializationException {
+ if (testRunner != null) {
+ testRunner.setProperty(AbstractAzureCosmosDBProcessor.DATABASE_NAME, MOCK_DB_NAME);
+ testRunner.setProperty(AbstractAzureCosmosDBProcessor.CONTAINER_ID,MOCK_CONTAINER_ID);
+ testRunner.setProperty(AbstractAzureCosmosDBProcessor.PARTITION_KEY,MOCK_PARTITION_FIELD_NAME);
+ if (withConnectionService) {
+ // setup connnection controller service
+ AzureCosmosDBClientService service = new MockConnectionService();
+ testRunner.addControllerService("connService", service);
+ testRunner.setProperty(service, AzureCosmosDBUtils.URI, MOCK_URI);
+ testRunner.setProperty(service, AzureCosmosDBUtils.DB_ACCESS_KEY, MOCK_DB_ACCESS_KEY);
+
+ // now, after enabling and setting the service, it should be valid
+ testRunner.enableControllerService(service);
+ testRunner.setProperty(AbstractAzureCosmosDBProcessor.CONNECTION_SERVICE, "connService");
+ }
+ }
+ }
+
+ private static Random random = new Random();
+ public static int getRandomInt(int min, int max){
+ return random.nextInt((max-min)+1) + min;
+ }
+
+ private class MockConnectionService extends AzureCosmosDBClientService {
+ @Override
+ protected void createCosmosClient(final String uri, final String accessKey, final ConsistencyLevel clevel){
+ // mock cosmos client
+ this.setCosmosClient(mock(CosmosClient.class));
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecordTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecordTest.java
new file mode 100644
index 0000000..4a6b8a8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecordTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.cosmos.document;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import com.azure.cosmos.ConsistencyLevel;
+import com.azure.cosmos.CosmosClient;
+import com.azure.cosmos.CosmosContainer;
+import com.azure.cosmos.CosmosException;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockSchemaRegistry;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import net.minidev.json.JSONObject;
+public class PutAzureCosmosDBRecordTest extends MockTestBase {
+
+ private MockPutAzureCosmosDBRecord processor;
+ private MockRecordParser recordReader;
+
+ private void setupRecordReader() throws InitializationException {
+ recordReader = new MockRecordParser();
+ if (testRunner != null) {
+ testRunner.addControllerService("reader", recordReader);
+ testRunner.enableControllerService(recordReader);
+ testRunner.setProperty(PutAzureCosmosDBRecord.RECORD_READER_FACTORY, "reader");
+ }
+
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ processor = new MockPutAzureCosmosDBRecord();
+ testRunner = TestRunners.newTestRunner(processor);
+ testRunner.setIncomingConnection(false);
+ testRunner.setNonLoopConnection(false);
+ }
+
+ @Test
+ public void testPutCosmosRecordProcessorConfigValidity() throws Exception {
+ setBasicMockProperties(false);
+ testRunner.setProperty(AzureCosmosDBUtils.URI, MOCK_URI);
+ testRunner.assertNotValid();
+ testRunner.setProperty(AzureCosmosDBUtils.DB_ACCESS_KEY, MOCK_DB_ACCESS_KEY);
+
+ testRunner.assertNotValid();
+
+ setupRecordReader();
+ testRunner.assertValid();
+ processor.setCosmosClient(null);
+ processor.onScheduled(testRunner.getProcessContext());
+ assertNotNull(processor.getCosmosClient());
+ }
+
+ @Test
+ public void testPutCosmosRecordProcessorConfigValidityWithConnectionService() throws Exception {
+ setBasicMockProperties(true);
+ testRunner.assertNotValid();
+ // setup recordReader
+ setupRecordReader();
+ testRunner.assertValid();
+ processor.setCosmosClient(null);
+ processor.onScheduled(testRunner.getProcessContext());
+ assertNotNull(processor.getCosmosClient());
+ }
+
+ @Test
+ public void testOnTriggerWithFlatRecords() throws Exception {
+ setupRecordReader();
+ prepareMockTest();
+ recordReader.addSchemaField("id", RecordFieldType.STRING);
+ recordReader.addSchemaField(MOCK_PARTITION_FIELD_NAME, RecordFieldType.STRING);
+ recordReader.addSchemaField("name", RecordFieldType.STRING);
+ recordReader.addSchemaField("age", RecordFieldType.INT);
+ recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+ recordReader.addRecord("1", "A", "John Doe", 48, "Soccer");
+ recordReader.addRecord("2", "B","Jane Doe", 47, "Tennis");
+ recordReader.addRecord("3", "B", "Sally Doe", 47, "Curling");
+ recordReader.addRecord("4", "A", "Jimmy Doe", 14, null);
+ recordReader.addRecord("5", "C","Pizza Doe", 14, null);
+
+ testRunner.enqueue("");
+ testRunner.run();
+ testRunner.assertAllFlowFilesTransferred(PutAzureCosmosDBRecord.REL_SUCCESS, 1);
+ assertEquals(5, processor.getTestResults().size());
+ }
+
+ @Test
+ public void testOnTriggerWithNestedRecords() throws Exception {
+ setupRecordReader();
+ prepareMockTest();
+ recordReader.addSchemaField("id", RecordFieldType.STRING);
+ recordReader.addSchemaField(MOCK_PARTITION_FIELD_NAME, RecordFieldType.STRING);
+
+ final List<RecordField> personFields = new ArrayList<>();
+ final RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType());
+ final RecordField ageField = new RecordField("age", RecordFieldType.INT.getDataType());
+ final RecordField sportField = new RecordField("sport", RecordFieldType.STRING.getDataType());
+ personFields.add(nameField);
+ personFields.add(ageField);
+ personFields.add(sportField);
+ final RecordSchema personSchema = new SimpleRecordSchema(personFields);
+ recordReader.addSchemaField("person", RecordFieldType.RECORD);
+
+ recordReader.addRecord("1", "A", new MapRecord(personSchema, new HashMap<String,Object>() {
+ private static final long serialVersionUID = -3185956498135742190L;
+ {
+ put("name", "John Doe");
+ put("age", 48);
+ put("sport", "Soccer");
+ }
+ }));
+ recordReader.addRecord("2", "B", new MapRecord(personSchema, new HashMap<String,Object>() {
+ private static final long serialVersionUID = 1L;
+ {
+ put("name", "Jane Doe");
+ put("age", 47);
+ put("sport", "Tennis");
+ }
+ }));
+ recordReader.addRecord("3", "A", new MapRecord(personSchema, new HashMap<String,Object>() {
+ private static final long serialVersionUID = -1329194249439570573L;
+ {
+ put("name", "Sally Doe");
+ put("age", 47);
+ put("sport", "Curling");
+ }
+ }));
+ recordReader.addRecord("4", "C", new MapRecord(personSchema, new HashMap<String,Object>() {
+ private static final long serialVersionUID = -1329194249439570574L;
+ {
+ put("name", "Jimmy Doe");
+ put("age", 14);
+ put("sport", null);
+ }
+ }));
+ testRunner.enqueue("");
+ testRunner.run();
+ testRunner.assertAllFlowFilesTransferred(PutAzureCosmosDBRecord.REL_SUCCESS, 1);
+ assertEquals(4, processor.getTestResults().size());
+ }
+
+ @Test
+ public void testArrayConversion() throws Exception {
+ Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();
+ // schema creation for test
+ JsonObject schemaDef = new JsonObject();
+ schemaDef.addProperty("type", "record");
+ schemaDef.addProperty("name", "Test");
+ JsonArray schemaArray = new JsonArray();
+ JsonObject f1 = new JsonObject();
+ f1.addProperty("type", "string");
+ f1.addProperty("name", "id");
+ schemaArray.add(f1);
+ JsonObject f2 = new JsonObject();
+ f2.addProperty("type", "string");
+ f2.addProperty("name", "name");
+ schemaArray.add(f2);
+ JsonObject f3 = new JsonObject();
+ f3.addProperty("type", "string");
+ f3.addProperty("name", "sport");
+ schemaArray.add(f3);
+ JsonObject arrayDef = new JsonObject();
+ arrayDef.addProperty("type", "array");
+ arrayDef.addProperty("items", "string");
+ JsonObject f4 = new JsonObject();
+ f4.add("type", arrayDef);
+ f4.addProperty("name", "arrayTest");
+ schemaArray.add(f4);
+ schemaDef.add("fields", schemaArray);
+
+ // test data generation
+ JsonObject testData = new JsonObject();
+ testData.addProperty("id", UUID.randomUUID().toString());
+ testData.addProperty("name", "John Doe");
+ testData.addProperty("sport", "Soccer");
+ JsonArray jarray = new JsonArray();
+ jarray.add("a");
+ jarray.add("b");
+ jarray.add("c");
+ testData.add("arrayTest", jarray);
+
+ // setup registry and reader
+ MockSchemaRegistry registry = new MockSchemaRegistry();
+ RecordSchema rschema = AvroTypeUtil.createSchema(new Schema.Parser().parse(gson.toJson(schemaDef)));
+ registry.addSchema("test", rschema);
+ JsonTreeReader reader = new JsonTreeReader();
+ testRunner.addControllerService("registry", registry);
+ testRunner.addControllerService("reader", reader);
+ testRunner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
+ testRunner.setProperty(PutAzureCosmosDBRecord.RECORD_READER_FACTORY, "reader");
+ testRunner.enableControllerService(registry);
+ testRunner.enableControllerService(reader);
+ prepareMockTest();
+ // override partiton key for this test case
+ testRunner.setProperty(PutAzureCosmosDBRecord.PARTITION_KEY, "sport");
+
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put("schema.name", "test");
+
+ testRunner.enqueue(gson.toJson(testData), attrs);
+ testRunner.run();
+
+ testRunner.assertTransferCount(PutAzureCosmosDBRecord.REL_FAILURE, 0);
+ testRunner.assertTransferCount(PutAzureCosmosDBRecord.REL_SUCCESS, 1);
+ List<Map<String, Object>> backendData = processor.getTestResults();
+ assertEquals(1, backendData.size());
+ //validate array data
+ JSONObject arrayTestResult = new JSONObject();
+ arrayTestResult.putAll(backendData.get(0));
+ Object[] check = (Object []) arrayTestResult.get("arrayTest");
+ assertArrayEquals(new Object[]{"a", "b", "c"}, check);
+ }
+ private void prepareMockTest() throws Exception {
+ // this setup connection service and basic mock properties
+ setBasicMockProperties(true);
+ }
+}
+
+class MockPutAzureCosmosDBRecord extends PutAzureCosmosDBRecord {
+
+ static CosmosClient mockClient = mock(CosmosClient.class);
+ static CosmosContainer mockContainer = mock(CosmosContainer.class);
+ private List<Map<String, Object>> mockBackend = new ArrayList<>();
+
+ @Override
+ protected void createCosmosClient(final String uri, final String accessKey, final ConsistencyLevel clevel) {
+ this.setCosmosClient(mockClient);
+ }
+ @Override
+ protected void getCosmosDocumentContainer(final ProcessContext context) throws CosmosException {
+ this.setContainer(mockContainer);
+ }
+
+ @Override
+ protected void bulkInsert(List<Map<String, Object>> records ) throws CosmosException{
+ this.mockBackend.addAll(records);
+ }
+
+ public List<Map<String, Object>> getTestResults() {
+ return mockBackend;
+ }
+
+
+ public CosmosContainer getMockConainer() {
+ return mockContainer;
+ }
+
+
+
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/cosmos/document/TestAzureCosmosDBClientService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/cosmos/document/TestAzureCosmosDBClientService.java
new file mode 100644
index 0000000..64e7cce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/cosmos/document/TestAzureCosmosDBClientService.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.cosmos.document;
+
+import org.apache.nifi.processors.azure.cosmos.document.AzureCosmosDBUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAzureCosmosDBClientService {
+
+ private static final String MOCK_URI = "https://mockURI:443/";
+ private static final String DB_ACCESS_KEY = "mockDB_ACCESS_KEY";
+
+ private TestRunner runner;
+ private AzureCosmosDBClientService service;
+
+ @Before
+ public void setUp() throws InitializationException {
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ service = new AzureCosmosDBClientService();
+ runner.addControllerService("connService", service);
+ }
+
+ @Test
+ public void testValidWithURIandDBAccessKey() {
+ configureURI();
+ configureDBAccessKey();
+
+ runner.assertValid(service);
+ }
+
+ @Test
+ public void testNotValidBecauseURIMissing() {
+ configureDBAccessKey();
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testNotValidBecauseDBAccessKeyMissing() {
+ configureURI();
+
+ runner.assertNotValid(service);
+ }
+
+ private void configureURI() {
+ runner.setProperty(service, AzureCosmosDBUtils.URI, MOCK_URI);
+ }
+
+ private void configureDBAccessKey() {
+ runner.setProperty(service, AzureCosmosDBUtils.DB_ACCESS_KEY, DB_ACCESS_KEY);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml
index ff6af32..57dacf5 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml
@@ -27,11 +27,38 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>${azure.core.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-xml</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-cosmos</artifactId>
+ <version>${azure-cosmos.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- overriding jackson-core in azure-storage -->
<dependency>
@@ -40,6 +67,16 @@
<version>${jackson.version}</version>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-xml</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>27.0.1-jre</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/cosmos/AzureCosmosDBConnectionService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/cosmos/AzureCosmosDBConnectionService.java
new file mode 100644
index 0000000..387e1f2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/cosmos/AzureCosmosDBConnectionService.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.cosmos;
+
+import com.azure.cosmos.CosmosClient;
+
+import org.apache.nifi.controller.ControllerService;
+
+public interface AzureCosmosDBConnectionService extends ControllerService {
+
+ String getURI();
+
+ String getAccessKey();
+
+ String getConsistencyLevel();
+
+ CosmosClient getCosmosClient();
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
index 86897e5..e1bee31 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
@@ -27,8 +27,9 @@
<properties>
<azure-storage.version>8.4.0</azure-storage.version>
- <azure.core.version>1.5.0</azure.core.version>
+ <azure.core.version>1.6.0</azure.core.version>
<jackson.version>2.10.3</jackson.version>
+ <azure-cosmos.version>4.2.0</azure-cosmos.version>
</properties>
<modules>