You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2019/10/03 09:37:24 UTC

[nifi] branch master updated: NIFI-6159 - Add BigQuery processor using the Streaming API

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new b12a9ad  NIFI-6159 - Add BigQuery processor using the Streaming API
b12a9ad is described below

commit b12a9ad446773f9f043dfef10327691bf3963a07
Author: Pierre Villard <pi...@gmail.com>
AuthorDate: Wed Mar 27 16:52:47 2019 +0100

    NIFI-6159 - Add BigQuery processor using the Streaming API
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #3394.
---
 .../nifi-gcp-bundle/nifi-gcp-processors/pom.xml    |  20 ++
 .../gcp/bigquery/AbstractBigQueryProcessor.java    |  73 ++++----
 .../gcp/bigquery/BigQueryAttributes.java           |  15 +-
 .../processors/gcp/bigquery/PutBigQueryBatch.java  | 120 ++++++------
 .../gcp/bigquery/PutBigQueryStreaming.java         | 201 +++++++++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   3 +-
 .../gcp/bigquery/AbstractBigQueryIT.java           |  52 ++++--
 .../gcp/bigquery/PutBigQueryBatchIT.java           |  22 ++-
 .../gcp/bigquery/PutBigQueryStreamingIT.java       | 184 +++++++++++++++++++
 .../resources/bigquery/streaming-bad-data.json     |  35 ++++
 .../resources/bigquery/streaming-correct-data.json |  36 ++++
 nifi-nar-bundles/nifi-gcp-bundle/pom.xml           |   2 +-
 12 files changed, 628 insertions(+), 135 deletions(-)

diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 5d21ef3..2455891 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -40,6 +40,18 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-processor-utils</artifactId>
             <version>1.10.0-SNAPSHOT</version>
         </dependency>
@@ -88,6 +100,12 @@
             <version>1.10.0-SNAPSHOT</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -100,6 +118,8 @@
                         <exclude>src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker</exclude>
                         <exclude>src/test/resources/mock-gcp-service-account.json</exclude>
                         <exclude>src/test/resources/mock-gcp-application-default-credentials.json</exclude>
+                        <exclude>src/test/resources/bigquery/streaming-bad-data.json</exclude>
+                        <exclude>src/test/resources/bigquery/streaming-correct-data.json</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
index 3751060..b2dc43a 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
@@ -1,12 +1,12 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,11 +17,13 @@
 
 package org.apache.nifi.processors.gcp.bigquery;
 
-import com.google.api.gax.retrying.RetrySettings;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.bigquery.BigQuery;
-import com.google.cloud.bigquery.BigQueryOptions;
-import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -35,13 +37,11 @@ import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
 import org.apache.nifi.proxy.ProxyConfiguration;
 import org.apache.nifi.util.StringUtils;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.common.collect.ImmutableList;
 
 /**
  * Base class for creating processors that connect to GCP BiqQuery service
@@ -50,14 +50,12 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
 
     static final int BUFFER_SIZE = 65536;
 
-    public static final Relationship REL_SUCCESS =
-            new Relationship.Builder().name("success")
-                    .description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.")
-                    .build();
-    public static final Relationship REL_FAILURE =
-            new Relationship.Builder().name("failure")
-                    .description("FlowFiles are routed to this relationship if the Google BigQuery operation fails.")
-                    .build();
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+            .description("FlowFiles are routed to this relationship if the Google BigQuery operation fails.")
+            .build();
 
     public static final Set<Relationship> relationships = Collections.unmodifiableSet(
             new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
@@ -82,23 +80,14 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
             .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
-    public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
-            .displayName("Table Schema")
-            .description(BigQueryAttributes.TABLE_SCHEMA_DESC)
-            .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
-            .displayName("Read Timeout")
-            .description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
+    public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
+            .displayName("Ignore Unknown Values")
+            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
             .required(true)
-            .defaultValue("5 minutes")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("false")
             .build();
 
     @Override
@@ -108,12 +97,11 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
 
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return ImmutableList.<PropertyDescriptor>builder()
+        return ImmutableList.<PropertyDescriptor> builder()
                 .addAll(super.getSupportedPropertyDescriptors())
                 .add(DATASET)
                 .add(TABLE_NAME)
-                .add(TABLE_SCHEMA)
-                .add(READ_TIMEOUT)
+                .add(IGNORE_UNKNOWN)
                 .build();
     }
 
@@ -153,7 +141,8 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
     }
 
     /**
-     * If sub-classes needs to implement any custom validation, override this method then add validation result to the results.
+     * If sub-classes needs to implement any custom validation, override this method then add
+     * validation result to the results.
      */
     protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
     }
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
index 81978eb..4a379c0 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
@@ -1,12 +1,12 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,7 +27,8 @@ import com.google.cloud.bigquery.JobInfo;
  * Attributes associated with the BigQuery processors
  */
 public class BigQueryAttributes {
-    private BigQueryAttributes() {}
+    private BigQueryAttributes() {
+    }
 
     public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
 
@@ -102,7 +103,12 @@ public class BigQueryAttributes {
             + "can interpret logical types into their corresponding types (such as TIMESTAMP) instead of only using their raw "
             + "types (such as INTEGER).";
 
+    public static final String RECORD_READER_ATTR = "bq.record.reader";
+    public static final String RECORD_READER_DESC = "Specifies the Controller Service to use for parsing incoming data.";
 
+    public static final String SKIP_INVALID_ROWS_ATTR = "bq.skip.invalid.rows";
+    public static final String SKIP_INVALID_ROWS_DESC = "Sets whether to insert all valid rows of a request, even if invalid "
+            + "rows exist. If not set the entire insert request will fail if it contains an invalid row.";
 
     // Batch Attributes
     public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time";
@@ -129,7 +135,6 @@ public class BigQueryAttributes {
     public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
     public static final String JOB_ERROR_LOCATION_DESC = "Load job error location";
 
-
     // Allowable values
     public static final AllowableValue CREATE_IF_NEEDED = new AllowableValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(),
             JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), "Configures the job to create the table if it does not exist.");
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
index 5446c20..cd3cb09 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
@@ -1,12 +1,12 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,16 +17,14 @@
 
 package org.apache.nifi.processors.gcp.bigquery;
 
-import com.google.cloud.RetryOption;
-import com.google.cloud.bigquery.FormatOptions;
-import com.google.cloud.bigquery.Job;
-import com.google.cloud.bigquery.JobInfo;
-import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
-import com.google.cloud.bigquery.Schema;
-import com.google.cloud.bigquery.TableDataWriteChannel;
-import com.google.cloud.bigquery.TableId;
-import com.google.cloud.bigquery.WriteChannelConfiguration;
-import com.google.common.collect.ImmutableList;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -52,39 +50,33 @@ import org.apache.nifi.util.StringUtils;
 import org.threeten.bp.Duration;
 import org.threeten.bp.temporal.ChronoUnit;
 
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import com.google.cloud.RetryOption;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.Job;
+import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.TableDataWriteChannel;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.WriteChannelConfiguration;
+import com.google.common.collect.ImmutableList;
 
 /**
  * A processor for batch loading data into a Google BigQuery table
  */
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@Tags({"google", "google cloud", "bq", "bigquery"})
+@Tags({ "google", "google cloud", "bq", "bigquery" })
 @CapabilityDescription("Batch loads flow files content to a Google BigQuery table.")
-@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
+@SeeAlso({ PutGCSObject.class, DeleteGCSObject.class })
 @WritesAttributes({
-    @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
-    @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
-    @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
-    @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
-    @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
-    @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
-    @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
-    @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
-    @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
-    @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
-    @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
-    @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
-    @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
-    @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
-    @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
-    @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
 })
 public class PutBigQueryBatch extends AbstractBigQueryProcessor {
 
@@ -99,7 +91,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
                 return builder.valid(true).explanation("Contains Expression Language").build();
             }
 
-            if(TYPES.contains(input.toUpperCase())) {
+            if (TYPES.contains(input.toUpperCase())) {
                 builder.valid(true);
             } else {
                 builder.valid(false).explanation("Load File Type must be one of the following options: " + StringUtils.join(TYPES, ", "));
@@ -109,23 +101,31 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
         }
     };
 
-    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
-            .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
-            .displayName("Load file type")
-            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
+    public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
+            .displayName("Read Timeout")
+            .description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
             .required(true)
-            .addValidator(FORMAT_VALIDATOR)
+            .defaultValue("5 minutes")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
+            .displayName("Table Schema")
+            .description(BigQueryAttributes.TABLE_SCHEMA_DESC)
+            .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-    public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
-            .displayName("Ignore Unknown Values")
-            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
+    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
+            .displayName("Load file type")
+            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
             .required(true)
-            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
-            .allowableValues("true", "false")
-            .defaultValue("false")
+            .addValidator(FORMAT_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
     public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
@@ -225,13 +225,14 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
 
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return ImmutableList.<PropertyDescriptor>builder()
+        return ImmutableList.<PropertyDescriptor> builder()
                 .addAll(super.getSupportedPropertyDescriptors())
+                .add(TABLE_SCHEMA)
+                .add(READ_TIMEOUT)
                 .add(SOURCE_TYPE)
                 .add(CREATE_DISPOSITION)
                 .add(WRITE_DISPOSITION)
                 .add(MAXBAD_RECORDS)
-                .add(IGNORE_UNKNOWN)
                 .add(CSV_ALLOW_JAGGED_ROWS)
                 .add(CSV_ALLOW_QUOTED_NEW_LINES)
                 .add(CSV_CHARSET)
@@ -271,7 +272,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
 
             FormatOptions formatOption;
 
-            if(type.equals(FormatOptions.csv().getType())) {
+            if (type.equals(FormatOptions.csv().getType())) {
                 formatOption = FormatOptions.csv().toBuilder()
                         .setAllowJaggedRows(context.getProperty(CSV_ALLOW_JAGGED_ROWS).asBoolean())
                         .setAllowQuotedNewLines(context.getProperty(CSV_ALLOW_QUOTED_NEW_LINES).asBoolean())
@@ -285,18 +286,17 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
             }
 
             final Schema schema = BigQueryUtils.schemaFromString(context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions(flowFile).getValue());
-            final WriteChannelConfiguration writeChannelConfiguration =
-                    WriteChannelConfiguration.newBuilder(tableId)
+            final WriteChannelConfiguration writeChannelConfiguration = WriteChannelConfiguration.newBuilder(tableId)
                     .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
                     .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
-                    .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
+                    .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean())
                     .setUseAvroLogicalTypes(context.getProperty(AVRO_USE_LOGICAL_TYPES).asBoolean())
                     .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
                     .setSchema(schema)
                     .setFormatOptions(formatOption)
                     .build();
 
-            try ( TableDataWriteChannel writer = getCloudService().writer(writeChannelConfiguration) ) {
+            try (TableDataWriteChannel writer = getCloudService().writer(writeChannelConfiguration)) {
 
                 session.read(flowFile, rawIn -> {
                     ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
@@ -337,7 +337,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
                         flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
 
                         // add the number of records successfully added
-                        if(job.getStatistics() instanceof LoadStatistics) {
+                        if (job.getStatistics() instanceof LoadStatistics) {
                             final LoadStatistics stats = (LoadStatistics) job.getStatistics();
                             attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(stats.getOutputRows()));
                         }
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
new file mode 100644
index 0000000..98457a3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.StringUtils;
+
+import com.google.cloud.bigquery.BigQueryError;
+import com.google.cloud.bigquery.InsertAllRequest;
+import com.google.cloud.bigquery.InsertAllResponse;
+import com.google.cloud.bigquery.TableId;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * A processor for streaming loading data into a Google BigQuery table. It uses the BigQuery
+ * streaming insert API to insert data. This provides the lowest-latency insert path into BigQuery,
+ * and therefore is the default method when the input is unbounded. BigQuery will make a strong
+ * effort to ensure no duplicates when using this path, however there are some scenarios in which
+ * BigQuery is unable to make this guarantee (see
+ * https://cloud.google.com/bigquery/streaming-data-into-bigquery). A query can be run over the
+ * output table to periodically clean these rare duplicates. Alternatively, using the Batch insert
+ * method does guarantee no duplicates, though the latency for the insert into BigQuery will be much
+ * higher.
+ */
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({ "google", "google cloud", "bq", "gcp", "bigquery", "record" })
+@CapabilityDescription("Load data into Google BigQuery table using the streaming API. This processor "
+        + "is not intended to load large flow files as it will load the full content into memory. If "
+        + "you need to insert large flow files, consider using PutBigQueryBatch instead.")
+@SeeAlso({ PutBigQueryBatch.class })
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttributes({
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
+})
+public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.RECORD_READER_ATTR)
+            .displayName("Record Reader")
+            .description(BigQueryAttributes.RECORD_READER_DESC)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor SKIP_INVALID_ROWS = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR)
+            .displayName("Skip Invalid Rows")
+            .description(BigQueryAttributes.SKIP_INVALID_ROWS_DESC)
+            .required(true)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("false")
+            .build();
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return ImmutableList.<PropertyDescriptor> builder()
+                .addAll(super.getSupportedPropertyDescriptors())
+                .add(RECORD_READER)
+                .add(SKIP_INVALID_ROWS)
+                .build();
+    }
+
+    @Override
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        super.onScheduled(context);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
+        final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
+        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+        final TableId tableId;
+        if (StringUtils.isEmpty(projectId)) {
+            tableId = TableId.of(dataset, tableName);
+        } else {
+            tableId = TableId.of(projectId, dataset, tableName);
+        }
+
+        try {
+
+            InsertAllRequest.Builder request = InsertAllRequest.newBuilder(tableId);
+            int nbrecord = 0;
+
+            try (final InputStream in = session.read(flowFile)) {
+                final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+                try (final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());) {
+                    Record currentRecord;
+                    while ((currentRecord = reader.nextRecord()) != null) {
+                        request.addRow(convertMapRecord(currentRecord.toMap()));
+                        nbrecord++;
+                    }
+                }
+            }
+
+            request.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean());
+            request.setSkipInvalidRows(context.getProperty(SKIP_INVALID_ROWS).evaluateAttributeExpressions(flowFile).asBoolean());
+
+            InsertAllResponse response = getCloudService().insertAll(request.build());
+
+            final Map<String, String> attributes = new HashMap<>();
+
+            if (response.hasErrors()) {
+                getLogger().log(LogLevel.WARN, "Failed to insert {} of {} records into BigQuery {} table.", new Object[] { response.getInsertErrors().size(), nbrecord, tableName });
+                if (getLogger().isDebugEnabled()) {
+                    for (long index : response.getInsertErrors().keySet()) {
+                        for (BigQueryError e : response.getInsertErrors().get(index)) {
+                            getLogger().log(LogLevel.DEBUG, "Failed to insert record #{}: {}", new Object[] { index, e.getMessage() });
+                        }
+                    }
+                }
+
+                attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(nbrecord - response.getInsertErrors().size()));
+
+                flowFile = session.penalize(flowFile);
+                flowFile = session.putAllAttributes(flowFile, attributes);
+                session.transfer(flowFile, REL_FAILURE);
+            } else {
+                attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(nbrecord));
+                flowFile = session.putAllAttributes(flowFile, attributes);
+                session.transfer(flowFile, REL_SUCCESS);
+            }
+
+        } catch (Exception ex) {
+            getLogger().log(LogLevel.ERROR, ex.getMessage(), ex);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+    private Map<String, Object> convertMapRecord(Map<String, Object> map) {
+        Map<String, Object> result = new HashMap<String, Object>();
+        for (String key : map.keySet()) {
+            Object obj = map.get(key);
+            if (obj instanceof MapRecord) {
+                result.put(key, convertMapRecord(((MapRecord) obj).toMap()));
+            } else if (obj instanceof Object[]
+                    && ((Object[]) obj).length > 0
+                    && ((Object[]) obj)[0] instanceof MapRecord) {
+                List<Map<String, Object>> lmapr = new ArrayList<Map<String, Object>>();
+                for (Object mapr : ((Object[]) obj)) {
+                    lmapr.add(convertMapRecord(((MapRecord) mapr).toMap()));
+                }
+                result.put(key, lmapr);
+            } else {
+                result.put(key, obj);
+            }
+        }
+        return result;
+    }
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 4ff9dfb..9d26958 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -18,4 +18,5 @@ org.apache.nifi.processors.gcp.storage.DeleteGCSObject
 org.apache.nifi.processors.gcp.storage.ListGCSBucket
 org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub
 org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
-org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
\ No newline at end of file
+org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
+org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
index 52327d4..cba730f 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
@@ -1,12 +1,12 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,13 +17,18 @@
 
 package org.apache.nifi.processors.gcp.bigquery;
 
-import com.google.cloud.bigquery.BigQuery;
-import com.google.cloud.bigquery.BigQueryOptions;
-import com.google.cloud.bigquery.Dataset;
-import com.google.cloud.bigquery.DatasetInfo;
-import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processors.gcp.GCPIntegrationTests;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
+import org.apache.nifi.processors.gcp.credentials.factory.CredentialsFactory;
 import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
@@ -31,24 +36,37 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertNull;
+import com.google.auth.Credentials;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.Dataset;
+import com.google.cloud.bigquery.DatasetInfo;
+import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
 
 @Category(GCPIntegrationTests.class)
 public abstract class AbstractBigQueryIT {
 
-    static final String CONTROLLER_SERVICE = "GCPCredentialsService";
+    protected static final String CONTROLLER_SERVICE = "GCPCredentialsService";
+    protected static final String PROJECT_ID = System.getProperty("test.gcp.project.id", "nifi");
+    protected static final String SERVICE_ACCOUNT_JSON = System.getProperty("test.gcp.service.account", "/path/to/service/account.json");
+
     protected static BigQuery bigquery;
     protected static Dataset dataset;
     protected static TestRunner runner;
 
+    private static final CredentialsFactory credentialsProviderFactory = new CredentialsFactory();
+
     @BeforeClass
-    public static void beforeClass() {
-        dataset = null;
+    public static void beforeClass() throws IOException {
+        final Map<PropertyDescriptor, String> propertiesMap = new HashMap<>();
+        propertiesMap.put(CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE, SERVICE_ACCOUNT_JSON);
+        Credentials credentials = credentialsProviderFactory.getGoogleCredentials(propertiesMap, new ProxyAwareTransportFactory(null));
+
         BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder()
+                .setProjectId(PROJECT_ID)
+                .setCredentials(credentials)
                 .build();
+
         bigquery = bigQueryOptions.getService();
 
         DatasetInfo datasetInfo = DatasetInfo.newBuilder(RemoteBigQueryHelper.generateDatasetName()).build();
@@ -67,9 +85,11 @@ public abstract class AbstractBigQueryIT {
     }
 
     protected TestRunner setCredentialsControllerService(TestRunner runner) throws InitializationException {
-        final Map<String, String> propertiesMap = new HashMap<>();
         final GCPCredentialsControllerService credentialsControllerService = new GCPCredentialsControllerService();
 
+        final Map<String, String> propertiesMap = new HashMap<>();
+        propertiesMap.put(CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE.getName(), SERVICE_ACCOUNT_JSON);
+
         runner.addControllerService(CONTROLLER_SERVICE, credentialsControllerService, propertiesMap);
         runner.enableControllerService(credentialsControllerService);
         runner.assertValid(credentialsControllerService);
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
index 8686213..bd56340 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
@@ -1,12 +1,12 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,7 +17,13 @@
 
 package org.apache.nifi.processors.gcp.bigquery;
 
-import com.google.cloud.bigquery.FormatOptions;
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
 import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
@@ -25,12 +31,7 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
+import com.google.cloud.bigquery.FormatOptions;
 
 public class PutBigQueryBatchIT extends AbstractBigQueryIT {
 
@@ -58,6 +59,7 @@ public class PutBigQueryBatchIT extends AbstractBigQueryIT {
     @Before
     public void setup() {
         runner = TestRunners.newTestRunner(PutBigQueryBatch.class);
+        runner.setProperty(AbstractBigQueryProcessor.PROJECT_ID, PROJECT_ID);
     }
 
     @Test
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java
new file mode 100644
index 0000000..6bed8be
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.file.Paths;
+import java.util.Iterator;
+
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.Field.Mode;
+import com.google.cloud.bigquery.FieldValueList;
+import com.google.cloud.bigquery.LegacySQLTypeName;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardTableDefinition;
+import com.google.cloud.bigquery.TableDefinition;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableInfo;
+import com.google.cloud.bigquery.TableResult;
+
+public class PutBigQueryStreamingIT extends AbstractBigQueryIT {
+
+    private Schema schema;
+
+    @Before
+    public void setup() throws InitializationException {
+        runner = TestRunners.newTestRunner(PutBigQueryStreaming.class);
+        runner = setCredentialsControllerService(runner);
+        runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
+        runner.setProperty(AbstractGCPProcessor.PROJECT_ID, PROJECT_ID);
+    }
+
+    private void createTable(String tableName) {
+        TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName);
+
+        // Table field definition
+        Field id = Field.newBuilder("id", LegacySQLTypeName.INTEGER).setMode(Mode.REQUIRED).build();
+        Field name = Field.newBuilder("name", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
+        Field alias = Field.newBuilder("alias", LegacySQLTypeName.STRING).setMode(Mode.REPEATED).build();
+
+        Field zip = Field.newBuilder("zip", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
+        Field city = Field.newBuilder("city", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
+        Field addresses = Field.newBuilder("addresses", LegacySQLTypeName.RECORD, zip, city).setMode(Mode.REPEATED).build();
+
+        Field position = Field.newBuilder("position", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
+        Field company = Field.newBuilder("company", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
+        Field job = Field.newBuilder("job", LegacySQLTypeName.RECORD, position, company).setMode(Mode.NULLABLE).build();
+
+        // Table schema definition
+        schema = Schema.of(id, name, alias, addresses, job);
+        TableDefinition tableDefinition = StandardTableDefinition.of(schema);
+        TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
+
+        // create table
+        bigquery.create(tableInfo);
+    }
+
+    private void deleteTable(String tableName) {
+        TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName);
+        bigquery.delete(tableId);
+    }
+
+    @Test
+    public void PutBigQueryStreamingNoError() throws Exception {
+        String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
+        createTable(tableName);
+
+        runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
+        runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("reader", jsonReader);
+        runner.enableControllerService(jsonReader);
+
+        runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
+
+        runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data.json"));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2");
+
+        TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
+        Iterator<FieldValueList> iterator = result.getValues().iterator();
+
+        FieldValueList firstElt = iterator.next();
+        FieldValueList sndElt = iterator.next();
+        assertTrue(firstElt.get("name").getStringValue().endsWith("Doe"));
+        assertTrue(sndElt.get("name").getStringValue().endsWith("Doe"));
+
+        FieldValueList john;
+        FieldValueList jane;
+        john = firstElt.get("name").getStringValue().equals("John Doe") ? firstElt : sndElt;
+        jane = firstElt.get("name").getStringValue().equals("Jane Doe") ? firstElt : sndElt;
+
+        assertEquals(jane.get("job").getRecordValue().get(0).getStringValue(), "Director");
+        assertTrue(john.get("alias").getRepeatedValue().size() == 2);
+        assertTrue(john.get("addresses").getRepeatedValue().get(0).getRecordValue().get(0).getStringValue().endsWith("000"));
+
+        deleteTable(tableName);
+    }
+
+    @Test
+    public void PutBigQueryStreamingFullError() throws Exception {
+        String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
+        createTable(tableName);
+
+        runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
+        runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("reader", jsonReader);
+        runner.enableControllerService(jsonReader);
+
+        runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
+
+        runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-bad-data.json"));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_FAILURE, 1);
+        runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_FAILURE).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "0");
+
+        TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
+        assertFalse(result.getValues().iterator().hasNext());
+
+        deleteTable(tableName);
+    }
+
+    @Test
+    public void PutBigQueryStreamingPartialError() throws Exception {
+        String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
+        createTable(tableName);
+
+        runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
+        runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("reader", jsonReader);
+        runner.enableControllerService(jsonReader);
+
+        runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
+        runner.setProperty(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR, "true");
+
+        runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-bad-data.json"));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_FAILURE, 1);
+        runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_FAILURE).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "1");
+
+        TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
+        Iterator<FieldValueList> iterator = result.getValues().iterator();
+
+        FieldValueList firstElt = iterator.next();
+        assertFalse(iterator.hasNext());
+        assertEquals(firstElt.get("name").getStringValue(), "Jane Doe");
+
+        deleteTable(tableName);
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-bad-data.json b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-bad-data.json
new file mode 100644
index 0000000..1a7ebba
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-bad-data.json
@@ -0,0 +1,35 @@
+[
+    {
+      "name": "John Doe",
+      "alias": ["john", "jd"],
+      "addresses": [
+        {
+          "zip": "1000",
+          "city": "NiFi"
+        },
+        {
+          "zip": "2000",
+          "city": "Bar"
+        }
+      ],
+      "job": {
+        "position": "Manager",
+        "company": "ASF"
+      }
+    },
+    {
+      "id": 2,
+      "name": "Jane Doe",
+      "alias": ["jane"],
+      "addresses": [
+        {
+          "zip": "1000",
+          "city": "NiFi"
+        }
+      ],
+      "job": {
+        "position": "Director",
+        "company": "ASF"
+      }
+    }
+  ]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data.json b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data.json
new file mode 100644
index 0000000..994037f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data.json
@@ -0,0 +1,36 @@
+[
+    {
+      "id": 1,
+      "name": "John Doe",
+      "alias": ["john", "jd"],
+      "addresses": [
+        {
+          "zip": "1000",
+          "city": "NiFi"
+        },
+        {
+          "zip": "2000",
+          "city": "Bar"
+        }
+      ],
+      "job": {
+        "position": "Manager",
+        "company": "ASF"
+      }
+    },
+    {
+      "id": 2,
+      "name": "Jane Doe",
+      "alias": [],
+      "addresses": [
+        {
+          "zip": "1000",
+          "city": "NiFi"
+        }
+      ],
+      "job": {
+        "position": "Director",
+        "company": "ASF"
+      }
+    }
+  ]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
index 2e8a65a..2f88a31 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
@@ -27,7 +27,7 @@
     <packaging>pom</packaging>
 
     <properties>
-        <google.cloud.sdk.version>0.101.0-alpha</google.cloud.sdk.version>
+        <google.cloud.sdk.version>0.107.0-alpha</google.cloud.sdk.version>
     </properties>
 
     <dependencyManagement>