You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2018/11/21 15:46:17 UTC

[1/2] nifi git commit: NIFI-4731: BQ Processors and GCP library update.

Repository: nifi
Updated Branches:
  refs/heads/master f6b171d5f -> 03ef64654


NIFI-4731: BQ Processors and GCP library update.

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/444caf8a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/444caf8a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/444caf8a

Branch: refs/heads/master
Commit: 444caf8a788c8c0a21c45558bb16cd8ee4b42872
Parents: f6b171d
Author: Daniel Jimenez <dj...@synack.com>
Authored: Sun Apr 29 10:01:34 2018 +0200
Committer: joewitt <jo...@apache.org>
Committed: Wed Nov 21 10:27:41 2018 -0500

----------------------------------------------------------------------
 .../nifi-gcp-bundle/nifi-gcp-processors/pom.xml |  10 +-
 .../processors/gcp/AbstractGCPProcessor.java    |   3 +-
 .../gcp/bigquery/AbstractBigQueryProcessor.java | 122 +++++++++
 .../gcp/bigquery/BigQueryAttributes.java        |  80 ++++++
 .../nifi/processors/gcp/bigquery/BqUtils.java   |  84 ++++++
 .../gcp/bigquery/PutBigQueryBatch.java          | 269 +++++++++++++++++++
 .../gcp/storage/AbstractGCSProcessor.java       |  36 +--
 .../processors/gcp/storage/ListGCSBucket.java   |  11 +-
 .../org.apache.nifi.processor.Processor         |   3 +-
 .../processors/gcp/bigquery/AbstractBQTest.java |  96 +++++++
 .../gcp/bigquery/AbstractBigQueryIT.java        |  79 ++++++
 .../gcp/bigquery/PutBigQueryBatchIT.java        | 137 ++++++++++
 .../gcp/bigquery/PutBigQueryBatchTest.java      | 153 +++++++++++
 .../gcp/storage/ListGCSBucketTest.java          |   1 -
 .../nifi-gcp-services-api/pom.xml               |   1 +
 15 files changed, 1060 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 669cbc6..24c60bf 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -62,7 +62,7 @@
         </dependency>
         <dependency>
             <groupId>com.google.cloud</groupId>
-            <artifactId>google-cloud-storage</artifactId>
+            <artifactId>google-cloud-core</artifactId>
             <exclusions>
                 <exclusion>
                     <groupId>com.google.code.findbugs</groupId>
@@ -72,6 +72,14 @@
         </dependency>
         <dependency>
             <groupId>com.google.cloud</groupId>
+            <artifactId>google-cloud-storage</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.cloud</groupId>
+            <artifactId>google-cloud-bigquery</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.cloud</groupId>
             <artifactId>google-cloud-pubsub</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
index 2c178ff..e95bf73 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
@@ -43,7 +43,8 @@ public abstract class AbstractGCPProcessor<
             .Builder().name("gcp-project-id")
             .displayName("Project ID")
             .description("Google Cloud Project ID")
-            .required(true)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
new file mode 100644
index 0000000..b52a552
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.common.collect.ImmutableList;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Base class for creating processors that connect to GCP BiqQuery service
+ */
+public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<BigQuery, BigQueryOptions> {
+    static final int BUFFER_SIZE = 65536;
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder().name("success")
+                    .description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description("FlowFiles are routed to this relationship if the Google BigQuery operation fails.")
+                    .build();
+
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+    public static final PropertyDescriptor DATASET = new PropertyDescriptor
+            .Builder().name(BigQueryAttributes.DATASET_ATTR)
+            .displayName("Dataset")
+            .description(BigQueryAttributes.DATASET_DESC)
+            .required(true)
+            .defaultValue("${" + BigQueryAttributes.DATASET_ATTR + "}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor
+            .Builder().name(BigQueryAttributes.TABLE_NAME_ATTR)
+            .displayName("Table Name")
+            .description(BigQueryAttributes.TABLE_NAME_DESC)
+            .required(true)
+            .defaultValue("${" + BigQueryAttributes.TABLE_NAME_ATTR + "}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor
+            .Builder().name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
+            .displayName("Table Schema")
+            .description(BigQueryAttributes.TABLE_SCHEMA_DESC)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor
+            .Builder().name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
+            .displayName("Read Timeout")
+            .description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
+            .required(true)
+            .defaultValue("5 minutes")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return ImmutableList.<PropertyDescriptor>builder()
+                .addAll(super.getSupportedPropertyDescriptors())
+                .build();
+    }
+
+    @Override
+    protected BigQueryOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
+        final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
+        final Integer retryCount = Integer.valueOf(context.getProperty(RETRY_COUNT).getValue());
+
+        BigQueryOptions.Builder builder = BigQueryOptions.newBuilder().setCredentials(credentials);
+
+        if (!StringUtils.isBlank(projectId)) {
+            builder.setProjectId(projectId);
+        }
+
+        return builder
+                .setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(retryCount).build())
+                .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
new file mode 100644
index 0000000..380f701
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
+
+/**
+ * Attributes associated with the BigQuery processors
+ */
+public class BigQueryAttributes {
+    private BigQueryAttributes() {}
+
+    public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
+
+    public static final String DATASET_ATTR = "bq.dataset";
+    public static final String DATASET_DESC = "BigQuery dataset";
+
+    public static final String TABLE_NAME_ATTR = "bq.table.name";
+    public static final String TABLE_NAME_DESC = "BigQuery table name";
+
+    public static final String TABLE_SCHEMA_ATTR = "bq.table.schema";
+    public static final String TABLE_SCHEMA_DESC = "BigQuery table name";
+
+    public static final String CREATE_DISPOSITION_ATTR = "bq.load.create_disposition";
+    public static final String CREATE_DISPOSITION_DESC = "Options for table creation";
+
+    public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
+    public static final String JOB_ERROR_MSG_DESC = "Load job error message";
+
+    public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
+    public static final String JOB_ERROR_REASON_DESC = "Load job error reason";
+
+    public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
+    public static final String JOB_ERROR_LOCATION_DESC = "Load job error location";
+
+    public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout";
+    public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out";
+
+
+    // Batch Attributes
+    public static final String SOURCE_TYPE_ATTR = "bq.load.type";
+    public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded";
+
+    public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown";
+    public static final String IGNORE_UNKNOWN_DESC = "Ignore fields not in table schema";
+
+    public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition";
+    public static final String WRITE_DISPOSITION_DESC = "Options for writing to table";
+
+    public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords";
+    public static final String MAX_BADRECORDS_DESC = "Number of erroneous records to ignore before generating an error";
+
+    public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time";
+    public static final String JOB_CREATE_TIME_DESC = "Time load job creation";
+
+    public static final String JOB_END_TIME_ATTR = "bq.job.stat.end_time";
+    public static final String JOB_END_TIME_DESC = "Time load job ended";
+
+    public static final String JOB_START_TIME_ATTR = "bq.job.stat.start_time";
+    public static final String JOB_START_TIME_DESC = "Time load job started";
+
+    public static final String JOB_LINK_ATTR = "bq.job.link";
+    public static final String JOB_LINK_DESC = "API Link to load job";
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
new file mode 100644
index 0000000..f7f5d66
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.LegacySQLTypeName;
+import com.google.cloud.bigquery.Schema;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ *
+ */
+public class BqUtils {
+    private final static Type gsonSchemaType = new TypeToken<List<Map>>() {
+    }.getType();
+
+    public static Field mapToField(Map fMap) {
+        String typeStr = fMap.get("type").toString();
+        String nameStr = fMap.get("name").toString();
+        String modeStr = fMap.get("mode").toString();
+        LegacySQLTypeName type = null;
+
+        if (typeStr.equals("BOOLEAN")) {
+            type = LegacySQLTypeName.BOOLEAN;
+        } else if (typeStr.equals("STRING")) {
+            type = LegacySQLTypeName.STRING;
+        } else if (typeStr.equals("BYTES")) {
+            type = LegacySQLTypeName.BYTES;
+        } else if (typeStr.equals("INTEGER")) {
+            type = LegacySQLTypeName.INTEGER;
+        } else if (typeStr.equals("FLOAT")) {
+            type = LegacySQLTypeName.FLOAT;
+        } else if (typeStr.equals("TIMESTAMP") || typeStr.equals("DATE")
+                || typeStr.equals("TIME") || typeStr.equals("DATETIME")) {
+            type = LegacySQLTypeName.TIMESTAMP;
+        } else if (typeStr.equals("RECORD")) {
+            type = LegacySQLTypeName.RECORD;
+        }
+
+        return Field.newBuilder(nameStr, type).setMode(Field.Mode.valueOf(modeStr)).build();
+    }
+
+    public static List<Field> listToFields(List<Map> m_fields) {
+        List<Field> fields = new ArrayList(m_fields.size());
+        for (Map m : m_fields) {
+            fields.add(mapToField(m));
+        }
+
+        return fields;
+    }
+
+    public static Schema schemaFromString(String schemaStr) {
+        if (schemaStr == null) {
+            return null;
+        } else {
+            Gson gson = new Gson();
+            List<Map> fields = gson.fromJson(schemaStr, gsonSchemaType);
+            return Schema.of(BqUtils.listToFields(fields));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
new file mode 100644
index 0000000..99c7f2a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.cloud.RetryOption;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.Job;
+import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.TableDataWriteChannel;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.WriteChannelConfiguration;
+import com.google.common.collect.ImmutableList;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
+import org.apache.nifi.processors.gcp.storage.PutGCSObject;
+import org.apache.nifi.util.StringUtils;
+import org.threeten.bp.Duration;
+import org.threeten.bp.temporal.ChronoUnit;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A processor for batch loading data into a Google BigQuery table
+ */
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"google", "google cloud", "bq", "bigquery"})
+@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
+@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
+
+@WritesAttributes({
+        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
+})
+
+public class PutBigQueryBatch extends AbstractBigQueryProcessor {
+
+    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
+            .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
+            .displayName("Load file type")
+            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
+            .required(true)
+            .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
+            .defaultValue(FormatOptions.avro().getType())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
+            .displayName("Ignore Unknown Values")
+            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
+            .required(true)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
+            .displayName("Create Disposition")
+            .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
+            .required(true)
+            .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
+            .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
+            .displayName("Write Disposition")
+            .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
+            .required(true)
+            .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
+            .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
+            .displayName("Max Bad Records")
+            .description(BigQueryAttributes.MAX_BADRECORDS_DESC)
+            .required(true)
+            .defaultValue("0")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    private Schema schemaCache = null;
+
+    public PutBigQueryBatch() {
+
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return ImmutableList.<PropertyDescriptor>builder()
+                .addAll(super.getSupportedPropertyDescriptors())
+                .add(DATASET)
+                .add(TABLE_NAME)
+                .add(TABLE_SCHEMA)
+                .add(SOURCE_TYPE)
+                .add(CREATE_DISPOSITION)
+                .add(WRITE_DISPOSITION)
+                .add(MAXBAD_RECORDS)
+                .add(IGNORE_UNKNOWN)
+                .build();
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .expressionLanguageSupported(true)
+                .dynamic(true)
+                .build();
+    }
+
+    @Override
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        super.onScheduled(context);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        final BigQuery bq = getCloudService();
+
+        final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
+        final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
+        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+        final TableId tableId;
+        if (StringUtils.isEmpty(projectId)) {
+            tableId = TableId.of(dataset, tableName);
+        } else {
+            tableId = TableId.of(projectId, dataset, tableName);
+        }
+
+        final String fileType = context.getProperty(SOURCE_TYPE).getValue();
+
+        String schemaString = context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue();
+        Schema schema = BqUtils.schemaFromString(schemaString);
+
+        WriteChannelConfiguration writeChannelConfiguration =
+                WriteChannelConfiguration.newBuilder(tableId)
+                        .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
+                        .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
+                        .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
+                        .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
+                        .setSchema(schema)
+                        .setFormatOptions(FormatOptions.of(fileType))
+                        .build();
+
+        TableDataWriteChannel writer = bq.writer(writeChannelConfiguration);
+
+        try {
+            session.read(flowFile, rawIn -> {
+                ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
+                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
+                while (readableByteChannel.read(byteBuffer) >= 0) {
+                    byteBuffer.flip();
+                    writer.write(byteBuffer);
+                    byteBuffer.clear();
+                }
+            });
+
+            writer.close();
+
+            Job job = writer.getJob();
+            PropertyValue property = context.getProperty(READ_TIMEOUT);
+            Long timePeriod = property.evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
+            Duration duration = Duration.of(timePeriod, ChronoUnit.SECONDS);
+            job = job.waitFor(RetryOption.totalTimeout(duration));
+
+            if (job != null) {
+                attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime()));
+                attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime()));
+                attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime()));
+                attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink());
+
+                boolean jobError = (job.getStatus().getError() != null);
+
+                if (jobError) {
+                    attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage());
+                    attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason());
+                    attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation());
+                } else {
+                    // in case it got looped back from error
+                    flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR);
+                    flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR);
+                    flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
+                }
+
+                if (!attributes.isEmpty()) {
+                    flowFile = session.putAllAttributes(flowFile, attributes);
+                }
+
+                if (jobError) {
+                    flowFile = session.penalize(flowFile);
+                    session.transfer(flowFile, REL_FAILURE);
+                } else {
+                    session.transfer(flowFile, REL_SUCCESS);
+                }
+            }
+
+        } catch (Exception ex) {
+            getLogger().log(LogLevel.ERROR, ex.getMessage(), ex);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
index 4363115..7c63631 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
@@ -16,12 +16,15 @@
  */
 package org.apache.nifi.processors.gcp.storage;
 
-import com.google.api.gax.retrying.RetrySettings;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.http.HttpTransportOptions;
-import com.google.cloud.storage.Storage;
-import com.google.cloud.storage.StorageOptions;
-import com.google.common.collect.ImmutableList;
+import java.net.Proxy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -31,14 +34,12 @@ import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
 import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
 import org.apache.nifi.proxy.ProxyConfiguration;
 
-import java.net.Proxy;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.http.HttpTransportOptions;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import com.google.common.collect.ImmutableList;
 
 /**
  * Base class for creating processors which connect to Google Cloud Storage.
@@ -86,12 +87,11 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
 
     @Override
     protected StorageOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
-        final String projectId = context.getProperty(PROJECT_ID).getValue();
+        final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
         final Integer retryCount = context.getProperty(RETRY_COUNT).asInteger();
 
         StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder()
                 .setCredentials(credentials)
-                .setProjectId(projectId)
                 .setRetrySettings(RetrySettings.newBuilder()
                         .setMaxAttempts(retryCount)
                         .build());
@@ -113,6 +113,10 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
             return ProxyConfiguration.DIRECT_CONFIGURATION;
         });
 
+        if (!projectId.isEmpty()) {
+            storageOptionsBuilder.setProjectId(projectId);
+        }
+
         final ProxyAwareTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
         storageOptionsBuilder.setTransportOptions(HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build());
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index e018814..01293cf 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -150,8 +150,8 @@ public class ListGCSBucket extends AbstractGCSProcessor {
             .displayName("Bucket")
             .description(BUCKET_DESC)
             .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder()
@@ -159,7 +159,8 @@ public class ListGCSBucket extends AbstractGCSProcessor {
             .displayName("Prefix")
             .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
             .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor USE_GENERATIONS = new PropertyDescriptor.Builder()
@@ -242,9 +243,9 @@ public class ListGCSBucket extends AbstractGCSProcessor {
 
         final long startNanos = System.nanoTime();
 
-        final String bucket = context.getProperty(BUCKET).getValue();
+        final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
 
-        final String prefix = context.getProperty(PREFIX).getValue();
+        final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
 
         final boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 249d19e..4ff9dfb 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -17,4 +17,5 @@ org.apache.nifi.processors.gcp.storage.FetchGCSObject
 org.apache.nifi.processors.gcp.storage.DeleteGCSObject
 org.apache.nifi.processors.gcp.storage.ListGCSBucket
 org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub
-org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
\ No newline at end of file
+org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
+org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java
new file mode 100644
index 0000000..e424a3a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.bigquery;
+
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+
+/**
+ * Base class for BigQuery Unit Tests. Provides a framework for creating a TestRunner instance with always-required credentials.
+ */
+public abstract class AbstractBQTest {
+    private static final String PROJECT_ID = System.getProperty("test.gcp.project.id", "nifi-test-gcp-project");
+    private static final Integer RETRIES = 9;
+
+    static final String DATASET = RemoteBigQueryHelper.generateDatasetName();
+
+    @Before
+    public void setup() throws Exception {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    public static TestRunner buildNewRunner(Processor processor) throws Exception {
+        final GCPCredentialsService credentialsService = new GCPCredentialsControllerService();
+
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.addControllerService("gcpCredentialsControllerService", credentialsService);
+        runner.enableControllerService(credentialsService);
+
+        runner.setProperty(AbstractBigQueryProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, "gcpCredentialsControllerService");
+        runner.setProperty(AbstractBigQueryProcessor.PROJECT_ID, PROJECT_ID);
+        runner.setProperty(AbstractBigQueryProcessor.RETRY_COUNT, String.valueOf(RETRIES));
+
+        runner.assertValid(credentialsService);
+
+        return runner;
+    }
+
+    public abstract AbstractBigQueryProcessor getProcessor();
+
+    protected abstract void addRequiredPropertiesToRunner(TestRunner runner);
+
+    @Mock
+    protected BigQuery bq;
+
+    @Test
+    public void testBiqQueryOptionsConfiguration() throws Exception {
+        reset(bq);
+        final TestRunner runner = buildNewRunner(getProcessor());
+
+        final AbstractBigQueryProcessor processor = getProcessor();
+        final GoogleCredentials mockCredentials = mock(GoogleCredentials.class);
+
+        final BigQueryOptions options = processor.getServiceOptions(runner.getProcessContext(),
+                mockCredentials);
+
+        assertEquals("Project IDs should match",
+                PROJECT_ID, options.getProjectId());
+
+        assertEquals("Retry counts should match",
+                RETRIES.intValue(), options.getRetrySettings().getMaxAttempts());
+
+        assertSame("Credentials should be configured correctly",
+                mockCredentials, options.getCredentials());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
new file mode 100644
index 0000000..52327d4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.Dataset;
+import com.google.cloud.bigquery.DatasetInfo;
+import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processors.gcp.GCPIntegrationTests;
+import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertNull;
+
+@Category(GCPIntegrationTests.class)
+public abstract class AbstractBigQueryIT {
+
+    static final String CONTROLLER_SERVICE = "GCPCredentialsService";
+    protected static BigQuery bigquery;
+    protected static Dataset dataset;
+    protected static TestRunner runner;
+
+    @BeforeClass
+    public static void beforeClass() {
+        dataset = null;
+        BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder()
+                .build();
+        bigquery = bigQueryOptions.getService();
+
+        DatasetInfo datasetInfo = DatasetInfo.newBuilder(RemoteBigQueryHelper.generateDatasetName()).build();
+        dataset = bigquery.create(datasetInfo);
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        bigquery.delete(dataset.getDatasetId(), BigQuery.DatasetDeleteOption.deleteContents());
+    }
+
+    protected static void validateNoServiceExceptionAttribute(FlowFile flowFile) {
+        assertNull(flowFile.getAttribute(BigQueryAttributes.JOB_ERROR_MSG_ATTR));
+        assertNull(flowFile.getAttribute(BigQueryAttributes.JOB_ERROR_REASON_ATTR));
+        assertNull(flowFile.getAttribute(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR));
+    }
+
+    protected TestRunner setCredentialsControllerService(TestRunner runner) throws InitializationException {
+        final Map<String, String> propertiesMap = new HashMap<>();
+        final GCPCredentialsControllerService credentialsControllerService = new GCPCredentialsControllerService();
+
+        runner.addControllerService(CONTROLLER_SERVICE, credentialsControllerService, propertiesMap);
+        runner.enableControllerService(credentialsControllerService);
+        runner.assertValid(credentialsControllerService);
+
+        return runner;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
new file mode 100644
index 0000000..8686213
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.cloud.bigquery.FormatOptions;
+import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class PutBigQueryBatchIT extends AbstractBigQueryIT {
+
+    private static final String TABLE_SCHEMA_STRING = "[\n" +
+            "  {\n" +
+            "    \"description\": \"field 1\",\n" +
+            "    \"mode\": \"REQUIRED\",\n" +
+            "    \"name\": \"field_1\",\n" +
+            "    \"type\": \"STRING\"\n" +
+            "  },\n" +
+            "  {\n" +
+            "    \"description\": \"field 2\",\n" +
+            "    \"mode\": \"REQUIRED\",\n" +
+            "    \"name\": \"field_2\",\n" +
+            "    \"type\": \"STRING\"\n" +
+            "  },\n" +
+            "  {\n" +
+            "    \"description\": \"field 3\",\n" +
+            "    \"mode\": \"NULLABLE\",\n" +
+            "    \"name\": \"field_3\",\n" +
+            "    \"type\": \"STRING\"\n" +
+            "  }\n" +
+            "]";
+
+    @Before
+    public void setup() {
+        runner = TestRunners.newTestRunner(PutBigQueryBatch.class);
+    }
+
+    @Test
+    public void PutBigQueryBatchSmallPayloadTest() throws Exception {
+        String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
+        runner = setCredentialsControllerService(runner);
+        runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
+        runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
+        runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName);
+        runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, FormatOptions.json().getType());
+        runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, TABLE_SCHEMA_STRING);
+
+        String str = "{\"field_1\":\"Daniel is great\",\"field_2\":\"Daniel is great\"}\r\n";
+
+        runner.enqueue(new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)));
+        runner.run(1);
+        for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(AbstractBigQueryProcessor.REL_SUCCESS)) {
+            validateNoServiceExceptionAttribute(flowFile);
+        }
+        runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void PutBigQueryBatchBadRecordTest() throws Exception {
+        String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
+        runner = setCredentialsControllerService(runner);
+        runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
+        runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
+        runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName);
+        runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, FormatOptions.json().getType());
+        runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, TABLE_SCHEMA_STRING);
+
+        String str = "{\"field_1\":\"Daniel is great\"}\r\n";
+
+        runner.enqueue(new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)));
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void PutBigQueryBatchLargePayloadTest() throws InitializationException, IOException {
+        String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
+        runner = setCredentialsControllerService(runner);
+        runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
+        runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
+        runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName);
+        runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, FormatOptions.json().getType());
+        runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, TABLE_SCHEMA_STRING);
+
+        // Allow one bad record to deal with the extra line break.
+        runner.setProperty(BigQueryAttributes.MAX_BADRECORDS_ATTR, String.valueOf(1));
+
+        String str = "{\"field_1\":\"Daniel is great\",\"field_2\":\"Here's to the crazy ones. The misfits. The rebels. The troublemakers." +
+                " The round pegs in the square holes. The ones who see things differently. They're not fond of rules. And they have no respect" +
+                " for the status quo. You can quote them, disagree with them, glorify or vilify them. About the only thing you can't do is ignore" +
+                " them. Because they change things. They push the human race forward. And while some may see them as the crazy ones, we see genius." +
+                " Because the people who are crazy enough to think they can change the world, are the ones who do.\"}\n";
+        Path tempFile = Files.createTempFile(methodName, "");
+        try (BufferedWriter writer = Files.newBufferedWriter(tempFile)) {
+
+            for (int i = 0; i < 2; i++) {
+                for (int ii = 0; ii < 1_000_000; ii++) {
+                    writer.write(str);
+                }
+                writer.flush();
+            }
+            writer.flush();
+        }
+
+        runner.enqueue(tempFile);
+        runner.run(1);
+        for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(AbstractBigQueryProcessor.REL_SUCCESS)) {
+            validateNoServiceExceptionAttribute(flowFile);
+        }
+        runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_SUCCESS, 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
new file mode 100644
index 0000000..7ec5aa9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.cloud.RetryOption;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryException;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.Job;
+import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.JobStatistics;
+import com.google.cloud.bigquery.JobStatus;
+import com.google.cloud.bigquery.Table;
+import com.google.cloud.bigquery.TableDataWriteChannel;
+import com.google.cloud.bigquery.WriteChannelConfiguration;
+import org.apache.nifi.util.TestRunner;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link PutBigQueryBatch}.
+ */
+public class PutBigQueryBatchTest extends AbstractBQTest {
+    private static final String TABLENAME = "test_table";
+    private static final String TABLE_SCHEMA = "[{ \"mode\": \"NULLABLE\", \"name\": \"data\", \"type\": \"STRING\" }]";
+    private static final String SOURCE_TYPE = FormatOptions.json().getType();
+    private static final String CREATE_DISPOSITION = JobInfo.CreateDisposition.CREATE_IF_NEEDED.name();
+    private static final String WRITE_DISPOSITION = JobInfo.WriteDisposition.WRITE_EMPTY.name();
+    private static final String MAXBAD_RECORDS = "0";
+    private static final String IGNORE_UNKNOWN = "true";
+    private static final String READ_TIMEOUT = "5 minutes";
+
+    @Mock
+    BigQuery bq;
+
+    @Mock
+    Table table;
+
+    @Mock
+    Job job;
+
+    @Mock
+    JobStatus jobStatus;
+
+    @Mock
+    JobStatistics stats;
+
+    @Mock
+    TableDataWriteChannel tableDataWriteChannel;
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+        reset(bq);
+        reset(table);
+        reset(job);
+        reset(jobStatus);
+        reset(stats);
+    }
+
+    @Override
+    public AbstractBigQueryProcessor getProcessor() {
+        return new PutBigQueryBatch() {
+            @Override
+            protected BigQuery getCloudService() {
+                return bq;
+            }
+        };
+    }
+
+    @Override
+    protected void addRequiredPropertiesToRunner(TestRunner runner) {
+        runner.setProperty(PutBigQueryBatch.DATASET, DATASET);
+        runner.setProperty(PutBigQueryBatch.TABLE_NAME, TABLENAME);
+        runner.setProperty(PutBigQueryBatch.TABLE_SCHEMA, TABLE_SCHEMA);
+        runner.setProperty(PutBigQueryBatch.SOURCE_TYPE, SOURCE_TYPE);
+        runner.setProperty(PutBigQueryBatch.CREATE_DISPOSITION, CREATE_DISPOSITION);
+        runner.setProperty(PutBigQueryBatch.WRITE_DISPOSITION, WRITE_DISPOSITION);
+        runner.setProperty(PutBigQueryBatch.MAXBAD_RECORDS, MAXBAD_RECORDS);
+        runner.setProperty(PutBigQueryBatch.IGNORE_UNKNOWN, IGNORE_UNKNOWN);
+        runner.setProperty(PutBigQueryBatch.READ_TIMEOUT, READ_TIMEOUT);
+    }
+
+    @Test
+    public void testSuccessfulLoad() throws Exception {
+        when(table.exists()).thenReturn(Boolean.TRUE);
+        when(bq.create(ArgumentMatchers.isA(JobInfo.class))).thenReturn(job);
+        when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel);
+        when(tableDataWriteChannel.getJob()).thenReturn(job);
+        when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenReturn(job);
+        when(job.getStatus()).thenReturn(jobStatus);
+        when(job.getStatistics()).thenReturn(stats);
+
+        when(stats.getCreationTime()).thenReturn(0L);
+        when(stats.getStartTime()).thenReturn(1L);
+        when(stats.getEndTime()).thenReturn(2L);
+
+        final TestRunner runner = buildNewRunner(getProcessor());
+        addRequiredPropertiesToRunner(runner);
+        runner.assertValid();
+
+        runner.enqueue("{ \"data\": \"datavalue\" }");
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_SUCCESS);
+    }
+
+
+    @Test
+    public void testFailedLoad() throws Exception {
+        when(table.exists()).thenReturn(Boolean.TRUE);
+        when(bq.create(ArgumentMatchers.isA(JobInfo.class))).thenReturn(job);
+        when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel);
+        when(tableDataWriteChannel.getJob()).thenReturn(job);
+        when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenThrow(BigQueryException.class);
+        when(job.getStatus()).thenReturn(jobStatus);
+        when(job.getStatistics()).thenReturn(stats);
+
+        when(stats.getCreationTime()).thenReturn(0L);
+        when(stats.getStartTime()).thenReturn(1L);
+        when(stats.getEndTime()).thenReturn(2L);
+
+        final TestRunner runner = buildNewRunner(getProcessor());
+        addRequiredPropertiesToRunner(runner);
+        runner.assertValid();
+
+        runner.enqueue("{ \"data\": \"datavalue\" }");
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_FAILURE);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
index eca81bb..e17cf4b 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.processors.gcp.storage;
 
-
 import com.google.api.gax.paging.Page;
 import com.google.cloud.storage.Acl;
 import com.google.cloud.storage.Blob;

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
index b8bd9f7..658350e 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
@@ -33,6 +33,7 @@
         <dependency>
             <groupId>com.google.auth</groupId>
             <artifactId>google-auth-library-oauth2-http</artifactId>
+            <version>0.9.0</version>
             <exclusions>
                 <exclusion>
                     <groupId>com.google.code.findbugs</groupId>


[2/2] nifi git commit: NIFI-4731 This closes #3019. This closes #2682. This closes #2420. NIFI-4933 BigQuery PR Review

Posted by jo...@apache.org.
NIFI-4731 This closes #3019.  This closes #2682.  This closes #2420.
NIFI-4933 BigQuery PR Review

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/03ef6465
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/03ef6465
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/03ef6465

Branch: refs/heads/master
Commit: 03ef6465478eea09704a2b6eb9be16f5001007b9
Parents: 444caf8
Author: Pierre Villard <pi...@gmail.com>
Authored: Tue Sep 18 23:19:03 2018 +0200
Committer: joewitt <jo...@apache.org>
Committed: Wed Nov 21 10:28:40 2018 -0500

----------------------------------------------------------------------
 .../processors/gcp/AbstractGCPProcessor.java    |  47 ++-
 .../gcp/bigquery/AbstractBigQueryProcessor.java |  62 +++-
 .../gcp/bigquery/BigQueryAttributes.java        | 101 ++++--
 .../processors/gcp/bigquery/BigQueryUtils.java  |  84 +++++
 .../nifi/processors/gcp/bigquery/BqUtils.java   |  84 -----
 .../gcp/bigquery/PutBigQueryBatch.java          | 308 ++++++++++++-------
 .../gcp/pubsub/AbstractGCPubSubProcessor.java   |  21 ++
 .../gcp/storage/AbstractGCSProcessor.java       |  27 +-
 .../nifi-gcp-services-api/pom.xml               |   2 +-
 nifi-nar-bundles/nifi-gcp-bundle/pom.xml        |   5 +-
 10 files changed, 479 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
index e95bf73..0c360d1 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
@@ -19,16 +19,20 @@ package org.apache.nifi.processors.gcp;
 import com.google.auth.oauth2.GoogleCredentials;
 import com.google.cloud.Service;
 import com.google.cloud.ServiceOptions;
+import com.google.cloud.TransportOptions;
+import com.google.cloud.http.HttpTransportOptions;
 import com.google.common.collect.ImmutableList;
+
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
 import org.apache.nifi.proxy.ProxyConfiguration;
 
+import java.net.Proxy;
 import java.util.List;
 
 /**
@@ -65,7 +69,7 @@ public abstract class AbstractGCPProcessor<
                     "-Djdk.http.auth.tunneling.disabledSchemes=\n" +
                     "-Djdk.http.auth.proxying.disabledSchemes=")
             .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
@@ -74,14 +78,14 @@ public abstract class AbstractGCPProcessor<
             .displayName("Proxy port")
             .description("Proxy port number")
             .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor
             .Builder().name("gcp-proxy-user-name")
-            .displayName("Http Proxy Username")
-            .description("Http Proxy Username")
+            .displayName("HTTP Proxy Username")
+            .description("HTTP Proxy Username")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .required(false)
@@ -89,8 +93,8 @@ public abstract class AbstractGCPProcessor<
 
     public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor
             .Builder().name("gcp-proxy-user-password")
-            .displayName("Http Proxy Password")
-            .description("Http Proxy Password")
+            .displayName("HTTP Proxy Password")
+            .description("HTTP Proxy Password")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .required(false)
@@ -160,4 +164,31 @@ public abstract class AbstractGCPProcessor<
      * @see <a href="http://googlecloudplatform.github.io/google-cloud-java/0.8.0/apidocs/com/google/cloud/ServiceOptions.html">ServiceOptions</a>
      */
     protected abstract CloudServiceOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials);
+
+    /**
+     * Builds the Transport Options containing the proxy configuration
+     * @param context Context to get properties
+     * @return Transport options object with proxy configuration
+     */
+    protected TransportOptions getTransportOptions(ProcessContext context) {
+        final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context, () -> {
+            final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
+            final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
+            if (proxyHost != null && proxyPort != null && proxyPort > 0) {
+                final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
+                final String proxyUser = context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue();
+                final String proxyPassword = context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
+                componentProxyConfig.setProxyType(Proxy.Type.HTTP);
+                componentProxyConfig.setProxyServerHost(proxyHost);
+                componentProxyConfig.setProxyServerPort(proxyPort);
+                componentProxyConfig.setProxyUserName(proxyUser);
+                componentProxyConfig.setProxyUserPassword(proxyPassword);
+                return componentProxyConfig;
+            }
+            return ProxyConfiguration.DIRECT_CONFIGURATION;
+        });
+
+        final ProxyAwareTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
+        return HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
index b52a552..c249e7e 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
@@ -22,15 +22,21 @@ import com.google.auth.oauth2.GoogleCredentials;
 import com.google.cloud.bigquery.BigQuery;
 import com.google.cloud.bigquery.BigQueryOptions;
 import com.google.common.collect.ImmutableList;
+
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.proxy.ProxyConfiguration;
 import org.apache.nifi.util.StringUtils;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -40,7 +46,9 @@ import java.util.Set;
  * Base class for creating processors that connect to GCP BiqQuery service
  */
 public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<BigQuery, BigQueryOptions> {
+
     static final int BUFFER_SIZE = 65536;
+
     public static final Relationship REL_SUCCESS =
             new Relationship.Builder().name("success")
                     .description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.")
@@ -53,8 +61,8 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
     public static final Set<Relationship> relationships = Collections.unmodifiableSet(
             new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
 
-    public static final PropertyDescriptor DATASET = new PropertyDescriptor
-            .Builder().name(BigQueryAttributes.DATASET_ATTR)
+    public static final PropertyDescriptor DATASET = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.DATASET_ATTR)
             .displayName("Dataset")
             .description(BigQueryAttributes.DATASET_DESC)
             .required(true)
@@ -63,8 +71,8 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
             .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
-    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor
-            .Builder().name(BigQueryAttributes.TABLE_NAME_ATTR)
+    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.TABLE_NAME_ATTR)
             .displayName("Table Name")
             .description(BigQueryAttributes.TABLE_NAME_DESC)
             .required(true)
@@ -73,22 +81,22 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
             .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
-    public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor
-            .Builder().name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
+    public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
             .displayName("Table Schema")
             .description(BigQueryAttributes.TABLE_SCHEMA_DESC)
             .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-    public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor
-            .Builder().name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
+    public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
             .displayName("Read Timeout")
             .description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
             .required(true)
             .defaultValue("5 minutes")
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
@@ -101,6 +109,10 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return ImmutableList.<PropertyDescriptor>builder()
                 .addAll(super.getSupportedPropertyDescriptors())
+                .add(DATASET)
+                .add(TABLE_NAME)
+                .add(TABLE_SCHEMA)
+                .add(READ_TIMEOUT)
                 .build();
     }
 
@@ -109,14 +121,40 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
         final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
         final Integer retryCount = Integer.valueOf(context.getProperty(RETRY_COUNT).getValue());
 
-        BigQueryOptions.Builder builder = BigQueryOptions.newBuilder().setCredentials(credentials);
+        final BigQueryOptions.Builder builder = BigQueryOptions.newBuilder();
 
         if (!StringUtils.isBlank(projectId)) {
             builder.setProjectId(projectId);
         }
 
-        return builder
+        return builder.setCredentials(credentials)
                 .setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(retryCount).build())
+                .setTransportOptions(getTransportOptions(context))
                 .build();
     }
+
+    @Override
+    protected final Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final Collection<ValidationResult> results = super.customValidate(validationContext);
+        ProxyConfiguration.validateProxySpec(validationContext, results, ProxyAwareTransportFactory.PROXY_SPECS);
+
+        final boolean projectId = validationContext.getProperty(PROJECT_ID).isSet();
+        if (!projectId) {
+            results.add(new ValidationResult.Builder()
+                    .subject(PROJECT_ID.getName())
+                    .valid(false)
+                    .explanation("The Project ID must be set for this processor.")
+                    .build());
+        }
+
+        customValidate(validationContext, results);
+        return results;
+    }
+
+    /**
+     * If sub-classes needs to implement any custom validation, override this method then add validation result to the results.
+     */
+    protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
index 380f701..842a176 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
@@ -17,9 +17,12 @@
 
 package org.apache.nifi.processors.gcp.bigquery;
 
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
 
+import com.google.cloud.bigquery.JobInfo;
+
 /**
  * Attributes associated with the BigQuery processors
  */
@@ -28,44 +31,75 @@ public class BigQueryAttributes {
 
     public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
 
+    // Properties
+    public static final String SOURCE_TYPE_ATTR = "bq.load.type";
+    public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded. Possible values: AVRO, "
+            + "NEWLINE_DELIMITED_JSON, CSV.";
+
+    public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown";
+    public static final String IGNORE_UNKNOWN_DESC = "Sets whether BigQuery should allow extra values that are not represented "
+            + "in the table schema. If true, the extra values are ignored. If false, records with extra columns are treated as "
+            + "bad records, and if there are too many bad records, an invalid error is returned in the job result. By default "
+            + "unknown values are not allowed.";
+
+    public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition";
+    public static final String WRITE_DISPOSITION_DESC = "Sets the action that should occur if the destination table already exists.";
+
+    public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords";
+    public static final String MAX_BADRECORDS_DESC = "Sets the maximum number of bad records that BigQuery can ignore when running "
+            + "the job. If the number of bad records exceeds this value, an invalid error is returned in the job result. By default "
+            + "no bad record is ignored.";
+
     public static final String DATASET_ATTR = "bq.dataset";
-    public static final String DATASET_DESC = "BigQuery dataset";
+    public static final String DATASET_DESC = "BigQuery dataset name (Note - The dataset must exist in GCP)";
 
     public static final String TABLE_NAME_ATTR = "bq.table.name";
     public static final String TABLE_NAME_DESC = "BigQuery table name";
 
     public static final String TABLE_SCHEMA_ATTR = "bq.table.schema";
-    public static final String TABLE_SCHEMA_DESC = "BigQuery table name";
+    public static final String TABLE_SCHEMA_DESC = "BigQuery schema in JSON format";
 
     public static final String CREATE_DISPOSITION_ATTR = "bq.load.create_disposition";
-    public static final String CREATE_DISPOSITION_DESC = "Options for table creation";
+    public static final String CREATE_DISPOSITION_DESC = "Sets whether the job is allowed to create new tables";
 
-    public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
-    public static final String JOB_ERROR_MSG_DESC = "Load job error message";
+    public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout";
+    public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out";
 
-    public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
-    public static final String JOB_ERROR_REASON_DESC = "Load job error reason";
+    public static final String CSV_ALLOW_JAGGED_ROWS_ATTR = "bq.csv.allow.jagged.rows";
+    public static final String CSV_ALLOW_JAGGED_ROWS_DESC = "Set whether BigQuery should accept rows that are missing "
+            + "trailing optional columns. If true, BigQuery treats missing trailing columns as null values. If false, "
+            + "records with missing trailing columns are treated as bad records, and if there are too many bad records, "
+            + "an invalid error is returned in the job result. By default, rows with missing trailing columns are "
+            + "considered bad records.";
 
-    public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
-    public static final String JOB_ERROR_LOCATION_DESC = "Load job error location";
+    public static final String CSV_ALLOW_QUOTED_NEW_LINES_ATTR = "bq.csv.allow.quoted.new.lines";
+    public static final String CSV_ALLOW_QUOTED_NEW_LINES_DESC = "Sets whether BigQuery should allow quoted data sections "
+            + "that contain newline characters in a CSV file. By default quoted newline are not allowed.";
 
-    public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout";
-    public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out";
+    public static final String CSV_CHARSET_ATTR = "bq.csv.charset";
+    public static final String CSV_CHARSET_DESC = "Sets the character encoding of the data.";
 
+    public static final String CSV_FIELD_DELIMITER_ATTR = "bq.csv.delimiter";
+    public static final String CSV_FIELD_DELIMITER_DESC = "Sets the separator for fields in a CSV file. BigQuery converts "
+            + "the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the data in its "
+            + "raw, binary state. BigQuery also supports the escape sequence \"\t\" to specify a tab separator. The default "
+            + "value is a comma (',').";
 
-    // Batch Attributes
-    public static final String SOURCE_TYPE_ATTR = "bq.load.type";
-    public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded";
+    public static final String CSV_QUOTE_ATTR = "bq.csv.quote";
+    public static final String CSV_QUOTE_DESC = "Sets the value that is used to quote data sections in a CSV file. BigQuery "
+            + "converts the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the "
+            + "data in its raw, binary state. The default value is a double-quote ('\"'). If your data does not contain quoted "
+            + "sections, set the property value to an empty string. If your data contains quoted newline characters, you must "
+            + "also set the Allow Quoted New Lines property to true.";
 
-    public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown";
-    public static final String IGNORE_UNKNOWN_DESC = "Ignore fields not in table schema";
+    public static final String CSV_SKIP_LEADING_ROWS_ATTR = "bq.csv.skip.leading.rows";
+    public static final String CSV_SKIP_LEADING_ROWS_DESC = "Sets the number of rows at the top of a CSV file that BigQuery "
+            + "will skip when reading the data. The default value is 0. This property is useful if you have header rows in the "
+            + "file that should be skipped.";
 
-    public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition";
-    public static final String WRITE_DISPOSITION_DESC = "Options for writing to table";
 
-    public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords";
-    public static final String MAX_BADRECORDS_DESC = "Number of erroneous records to ignore before generating an error";
 
+    // Batch Attributes
     public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time";
     public static final String JOB_CREATE_TIME_DESC = "Time load job creation";
 
@@ -77,4 +111,31 @@ public class BigQueryAttributes {
 
     public static final String JOB_LINK_ATTR = "bq.job.link";
     public static final String JOB_LINK_DESC = "API Link to load job";
+
+    public static final String JOB_NB_RECORDS_ATTR = "bq.records.count";
+    public static final String JOB_NB_RECORDS_DESC = "Number of records successfully inserted";
+
+    public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
+    public static final String JOB_ERROR_MSG_DESC = "Load job error message";
+
+    public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
+    public static final String JOB_ERROR_REASON_DESC = "Load job error reason";
+
+    public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
+    public static final String JOB_ERROR_LOCATION_DESC = "Load job error location";
+
+
+    // Allowable values
+    public static final AllowableValue CREATE_IF_NEEDED = new AllowableValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(),
+            JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), "Configures the job to create the table if it does not exist.");
+    public static final AllowableValue CREATE_NEVER = new AllowableValue(JobInfo.CreateDisposition.CREATE_NEVER.name(),
+            JobInfo.CreateDisposition.CREATE_NEVER.name(), "Configures the job to fail with a not-found error if the table does not exist.");
+
+    public static final AllowableValue WRITE_EMPTY = new AllowableValue(JobInfo.WriteDisposition.WRITE_EMPTY.name(),
+            JobInfo.WriteDisposition.WRITE_EMPTY.name(), "Configures the job to fail with a duplicate error if the table already exists.");
+    public static final AllowableValue WRITE_APPEND = new AllowableValue(JobInfo.WriteDisposition.WRITE_APPEND.name(),
+            JobInfo.WriteDisposition.WRITE_APPEND.name(), "Configures the job to append data to the table if it already exists.");
+    public static final AllowableValue WRITE_TRUNCATE = new AllowableValue(JobInfo.WriteDisposition.WRITE_TRUNCATE.name(),
+            JobInfo.WriteDisposition.WRITE_TRUNCATE.name(), "Configures the job to overwrite the table data if table already exists.");
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java
new file mode 100644
index 0000000..3139ffd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.LegacySQLTypeName;
+import com.google.cloud.bigquery.Schema;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+
+/**
+ * Util class for schema manipulation
+ */
+public class BigQueryUtils {
+
+    private final static Type gsonSchemaType = new TypeToken<List<Map>>() { }.getType();
+
+    public static Field mapToField(Map fMap) {
+        String typeStr = fMap.get("type").toString();
+        String nameStr = fMap.get("name").toString();
+        String modeStr = fMap.get("mode").toString();
+        LegacySQLTypeName type = null;
+
+        if (typeStr.equals("BOOLEAN")) {
+            type = LegacySQLTypeName.BOOLEAN;
+        } else if (typeStr.equals("STRING")) {
+            type = LegacySQLTypeName.STRING;
+        } else if (typeStr.equals("BYTES")) {
+            type = LegacySQLTypeName.BYTES;
+        } else if (typeStr.equals("INTEGER")) {
+            type = LegacySQLTypeName.INTEGER;
+        } else if (typeStr.equals("FLOAT")) {
+            type = LegacySQLTypeName.FLOAT;
+        } else if (typeStr.equals("TIMESTAMP") || typeStr.equals("DATE")
+                || typeStr.equals("TIME") || typeStr.equals("DATETIME")) {
+            type = LegacySQLTypeName.TIMESTAMP;
+        } else if (typeStr.equals("RECORD")) {
+            type = LegacySQLTypeName.RECORD;
+        }
+
+        return Field.newBuilder(nameStr, type).setMode(Field.Mode.valueOf(modeStr)).build();
+    }
+
+    public static List<Field> listToFields(List<Map> m_fields) {
+        List<Field> fields = new ArrayList(m_fields.size());
+        for (Map m : m_fields) {
+            fields.add(mapToField(m));
+        }
+
+        return fields;
+    }
+
+    public static Schema schemaFromString(String schemaStr) {
+        if (schemaStr == null) {
+            return null;
+        } else {
+            Gson gson = new Gson();
+            List<Map> fields = gson.fromJson(schemaStr, gsonSchemaType);
+            return Schema.of(BigQueryUtils.listToFields(fields));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
deleted file mode 100644
index f7f5d66..0000000
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.gcp.bigquery;
-
-import com.google.cloud.bigquery.Field;
-import com.google.cloud.bigquery.LegacySQLTypeName;
-import com.google.cloud.bigquery.Schema;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- *
- */
-public class BqUtils {
-    private final static Type gsonSchemaType = new TypeToken<List<Map>>() {
-    }.getType();
-
-    public static Field mapToField(Map fMap) {
-        String typeStr = fMap.get("type").toString();
-        String nameStr = fMap.get("name").toString();
-        String modeStr = fMap.get("mode").toString();
-        LegacySQLTypeName type = null;
-
-        if (typeStr.equals("BOOLEAN")) {
-            type = LegacySQLTypeName.BOOLEAN;
-        } else if (typeStr.equals("STRING")) {
-            type = LegacySQLTypeName.STRING;
-        } else if (typeStr.equals("BYTES")) {
-            type = LegacySQLTypeName.BYTES;
-        } else if (typeStr.equals("INTEGER")) {
-            type = LegacySQLTypeName.INTEGER;
-        } else if (typeStr.equals("FLOAT")) {
-            type = LegacySQLTypeName.FLOAT;
-        } else if (typeStr.equals("TIMESTAMP") || typeStr.equals("DATE")
-                || typeStr.equals("TIME") || typeStr.equals("DATETIME")) {
-            type = LegacySQLTypeName.TIMESTAMP;
-        } else if (typeStr.equals("RECORD")) {
-            type = LegacySQLTypeName.RECORD;
-        }
-
-        return Field.newBuilder(nameStr, type).setMode(Field.Mode.valueOf(modeStr)).build();
-    }
-
-    public static List<Field> listToFields(List<Map> m_fields) {
-        List<Field> fields = new ArrayList(m_fields.size());
-        for (Map m : m_fields) {
-            fields.add(mapToField(m));
-        }
-
-        return fields;
-    }
-
-    public static Schema schemaFromString(String schemaStr) {
-        if (schemaStr == null) {
-            return null;
-        } else {
-            Gson gson = new Gson();
-            List<Map> fields = gson.fromJson(schemaStr, gsonSchemaType);
-            return Schema.of(BqUtils.listToFields(fields));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
index 99c7f2a..5068ab5 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
@@ -18,15 +18,16 @@
 package org.apache.nifi.processors.gcp.bigquery;
 
 import com.google.cloud.RetryOption;
-import com.google.cloud.bigquery.BigQuery;
 import com.google.cloud.bigquery.FormatOptions;
 import com.google.cloud.bigquery.Job;
 import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
 import com.google.cloud.bigquery.Schema;
 import com.google.cloud.bigquery.TableDataWriteChannel;
 import com.google.cloud.bigquery.TableId;
 import com.google.cloud.bigquery.WriteChannelConfiguration;
 import com.google.common.collect.ImmutableList;
+
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -35,7 +36,10 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.processor.ProcessContext;
@@ -51,6 +55,7 @@ import org.threeten.bp.temporal.ChronoUnit;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -59,40 +64,58 @@ import java.util.concurrent.TimeUnit;
 /**
  * A processor for batch loading data into a Google BigQuery table
  */
-
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @Tags({"google", "google cloud", "bq", "bigquery"})
-@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
+@CapabilityDescription("Batch loads flow files content to a Google BigQuery table.")
 @SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
-
 @WritesAttributes({
-        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
+    @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
 })
-
 public class PutBigQueryBatch extends AbstractBigQueryProcessor {
 
+    private static final List<String> TYPES = Arrays.asList(FormatOptions.json().getType(), FormatOptions.csv().getType(), FormatOptions.avro().getType());
+
+    private static final Validator FORMAT_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+            final ValidationResult.Builder builder = new ValidationResult.Builder();
+            builder.subject(subject).input(input);
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+                return builder.valid(true).explanation("Contains Expression Language").build();
+            }
+
+            if(TYPES.contains(input.toUpperCase())) {
+                builder.valid(true);
+            } else {
+                builder.valid(false).explanation("Load File Type must be one of the following options: " + StringUtils.join(TYPES, ", "));
+            }
+
+            return builder.build();
+        }
+    };
+
     public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
             .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
             .displayName("Load file type")
             .description(BigQueryAttributes.SOURCE_TYPE_DESC)
             .required(true)
-            .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
-            .defaultValue(FormatOptions.avro().getType())
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(FORMAT_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
     public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
@@ -102,7 +125,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
             .required(true)
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .allowableValues("true", "false")
-            .defaultValue("true")
+            .defaultValue("false")
             .build();
 
     public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
@@ -110,8 +133,8 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
             .displayName("Create Disposition")
             .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
             .required(true)
-            .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
-            .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
+            .allowableValues(BigQueryAttributes.CREATE_IF_NEEDED, BigQueryAttributes.CREATE_NEVER)
+            .defaultValue(BigQueryAttributes.CREATE_IF_NEEDED.getValue())
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
@@ -120,8 +143,8 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
             .displayName("Write Disposition")
             .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
             .required(true)
-            .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
-            .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
+            .allowableValues(BigQueryAttributes.WRITE_EMPTY, BigQueryAttributes.WRITE_APPEND, BigQueryAttributes.WRITE_TRUNCATE)
+            .defaultValue(BigQueryAttributes.WRITE_EMPTY.getValue())
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
@@ -134,34 +157,78 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
             .build();
 
-    private Schema schemaCache = null;
+    public static final PropertyDescriptor CSV_ALLOW_JAGGED_ROWS = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_ATTR)
+            .displayName("CSV Input - Allow Jagged Rows")
+            .description(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_DESC)
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor CSV_ALLOW_QUOTED_NEW_LINES = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_ATTR)
+            .displayName("CSV Input - Allow Quoted New Lines")
+            .description(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_DESC)
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
 
-    public PutBigQueryBatch() {
+    public static final PropertyDescriptor CSV_CHARSET = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.CSV_CHARSET_ATTR)
+            .displayName("CSV Input - Character Set")
+            .description(BigQueryAttributes.CSV_CHARSET_DESC)
+            .required(true)
+            .allowableValues("UTF-8", "ISO-8859-1")
+            .defaultValue("UTF-8")
+            .build();
 
-    }
+    public static final PropertyDescriptor CSV_FIELD_DELIMITER = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.CSV_FIELD_DELIMITER_ATTR)
+            .displayName("CSV Input - Field Delimiter")
+            .description(BigQueryAttributes.CSV_FIELD_DELIMITER_DESC)
+            .required(true)
+            .defaultValue(",")
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor CSV_QUOTE = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.CSV_QUOTE_ATTR)
+            .displayName("CSV Input - Quote")
+            .description(BigQueryAttributes.CSV_QUOTE_DESC)
+            .required(true)
+            .defaultValue("\"")
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor CSV_SKIP_LEADING_ROWS = new PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_ATTR)
+            .displayName("CSV Input - Skip Leading Rows")
+            .description(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_DESC)
+            .required(true)
+            .defaultValue("0")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
 
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return ImmutableList.<PropertyDescriptor>builder()
                 .addAll(super.getSupportedPropertyDescriptors())
-                .add(DATASET)
-                .add(TABLE_NAME)
-                .add(TABLE_SCHEMA)
                 .add(SOURCE_TYPE)
                 .add(CREATE_DISPOSITION)
                 .add(WRITE_DISPOSITION)
                 .add(MAXBAD_RECORDS)
                 .add(IGNORE_UNKNOWN)
-                .build();
-    }
-
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-                .name(propertyDescriptorName)
-                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-                .expressionLanguageSupported(true)
-                .dynamic(true)
+                .add(CSV_ALLOW_JAGGED_ROWS)
+                .add(CSV_ALLOW_QUOTED_NEW_LINES)
+                .add(CSV_CHARSET)
+                .add(CSV_FIELD_DELIMITER)
+                .add(CSV_QUOTE)
+                .add(CSV_SKIP_LEADING_ROWS)
                 .build();
     }
 
@@ -178,13 +245,10 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
             return;
         }
 
-        final Map<String, String> attributes = new HashMap<>();
-
-        final BigQuery bq = getCloudService();
-
         final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
         final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
         final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String type = context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue();
 
         final TableId tableId;
         if (StringUtils.isEmpty(projectId)) {
@@ -193,70 +257,93 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
             tableId = TableId.of(projectId, dataset, tableName);
         }
 
-        final String fileType = context.getProperty(SOURCE_TYPE).getValue();
+        try {
 
-        String schemaString = context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue();
-        Schema schema = BqUtils.schemaFromString(schemaString);
+            FormatOptions formatOption;
 
-        WriteChannelConfiguration writeChannelConfiguration =
-                WriteChannelConfiguration.newBuilder(tableId)
-                        .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
-                        .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
-                        .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
-                        .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
-                        .setSchema(schema)
-                        .setFormatOptions(FormatOptions.of(fileType))
+            if(type.equals(FormatOptions.csv().getType())) {
+                formatOption = FormatOptions.csv().toBuilder()
+                        .setAllowJaggedRows(context.getProperty(CSV_ALLOW_JAGGED_ROWS).asBoolean())
+                        .setAllowQuotedNewLines(context.getProperty(CSV_ALLOW_QUOTED_NEW_LINES).asBoolean())
+                        .setEncoding(context.getProperty(CSV_CHARSET).getValue())
+                        .setFieldDelimiter(context.getProperty(CSV_FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue())
+                        .setQuote(context.getProperty(CSV_QUOTE).evaluateAttributeExpressions(flowFile).getValue())
+                        .setSkipLeadingRows(context.getProperty(CSV_SKIP_LEADING_ROWS).evaluateAttributeExpressions(flowFile).asInteger())
                         .build();
+            } else {
+                formatOption = FormatOptions.of(type);
+            }
 
-        TableDataWriteChannel writer = bq.writer(writeChannelConfiguration);
-
-        try {
-            session.read(flowFile, rawIn -> {
-                ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
-                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
-                while (readableByteChannel.read(byteBuffer) >= 0) {
-                    byteBuffer.flip();
-                    writer.write(byteBuffer);
-                    byteBuffer.clear();
-                }
-            });
-
-            writer.close();
-
-            Job job = writer.getJob();
-            PropertyValue property = context.getProperty(READ_TIMEOUT);
-            Long timePeriod = property.evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
-            Duration duration = Duration.of(timePeriod, ChronoUnit.SECONDS);
-            job = job.waitFor(RetryOption.totalTimeout(duration));
-
-            if (job != null) {
-                attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime()));
-                attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime()));
-                attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime()));
-                attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink());
-
-                boolean jobError = (job.getStatus().getError() != null);
-
-                if (jobError) {
-                    attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage());
-                    attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason());
-                    attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation());
-                } else {
-                    // in case it got looped back from error
-                    flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR);
-                    flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR);
-                    flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
-                }
-
-                if (!attributes.isEmpty()) {
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-                }
-
-                if (jobError) {
-                    flowFile = session.penalize(flowFile);
-                    session.transfer(flowFile, REL_FAILURE);
-                } else {
-                    session.transfer(flowFile, REL_SUCCESS);
+            final Schema schema = BigQueryUtils.schemaFromString(context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions(flowFile).getValue());
+            final WriteChannelConfiguration writeChannelConfiguration =
+                    WriteChannelConfiguration.newBuilder(tableId)
+                    .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
+                    .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
+                    .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
+                    .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
+                    .setSchema(schema)
+                    .setFormatOptions(formatOption)
+                    .build();
+
+            try ( TableDataWriteChannel writer = getCloudService().writer(writeChannelConfiguration) ) {
+
+                session.read(flowFile, rawIn -> {
+                    ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
+                    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
+                    while (readableByteChannel.read(byteBuffer) >= 0) {
+                        byteBuffer.flip();
+                        writer.write(byteBuffer);
+                        byteBuffer.clear();
+                    }
+                });
+
+                // writer must be closed to get the job
+                writer.close();
+
+                Job job = writer.getJob();
+                Long timePeriod = context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.SECONDS);
+                Duration waitFor = Duration.of(timePeriod, ChronoUnit.SECONDS);
+                job = job.waitFor(RetryOption.totalTimeout(waitFor));
+
+                if (job != null) {
+                    final Map<String, String> attributes = new HashMap<>();
+
+                    attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime()));
+                    attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime()));
+                    attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime()));
+                    attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink());
+
+                    boolean jobError = (job.getStatus().getError() != null);
+
+                    if (jobError) {
+                        attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage());
+                        attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason());
+                        attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation());
+                    } else {
+                        // in case it got looped back from error
+                        flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR);
+                        flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR);
+                        flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
+
+                        // add the number of records successfully added
+                        if(job.getStatistics() instanceof LoadStatistics) {
+                            final LoadStatistics stats = (LoadStatistics) job.getStatistics();
+                            attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(stats.getOutputRows()));
+                        }
+                    }
+
+                    if (!attributes.isEmpty()) {
+                        flowFile = session.putAllAttributes(flowFile, attributes);
+                    }
+
+                    if (jobError) {
+                        getLogger().log(LogLevel.WARN, job.getStatus().getError().getMessage());
+                        flowFile = session.penalize(flowFile);
+                        session.transfer(flowFile, REL_FAILURE);
+                    } else {
+                        session.getProvenanceReporter().send(flowFile, job.getSelfLink(), job.getStatistics().getEndTime() - job.getStatistics().getStartTime());
+                        session.transfer(flowFile, REL_SUCCESS);
+                    }
                 }
             }
 
@@ -266,4 +353,5 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
             session.transfer(flowFile, REL_FAILURE);
         }
     }
-}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
index 31dc0a7..8930a27 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
@@ -18,13 +18,17 @@ package org.apache.nifi.processors.gcp.pubsub;
 
 import com.google.auth.oauth2.GoogleCredentials;
 import com.google.cloud.ServiceOptions;
+
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -62,4 +66,21 @@ public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor {
     protected ServiceOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
         return null;
     }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final Collection<ValidationResult> results = super.customValidate(validationContext);
+
+        final boolean projectId = validationContext.getProperty(PROJECT_ID).isSet();
+        if (!projectId) {
+            results.add(new ValidationResult.Builder()
+                    .subject(PROJECT_ID.getName())
+                    .valid(false)
+                    .explanation("The Project ID must be set for this processor.")
+                    .build());
+        }
+
+        return results;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
index 7c63631..fba984e 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
@@ -16,8 +16,6 @@
  */
 package org.apache.nifi.processors.gcp.storage;
 
-import java.net.Proxy;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -36,7 +34,6 @@ import org.apache.nifi.proxy.ProxyConfiguration;
 
 import com.google.api.gax.retrying.RetrySettings;
 import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.http.HttpTransportOptions;
 import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.StorageOptions;
 import com.google.common.collect.ImmutableList;
@@ -73,7 +70,7 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
 
     @Override
     protected final Collection<ValidationResult> customValidate(ValidationContext validationContext) {
-        final Collection<ValidationResult> results = new ArrayList<>();
+        final Collection<ValidationResult> results = super.customValidate(validationContext);
         ProxyConfiguration.validateProxySpec(validationContext, results, ProxyAwareTransportFactory.PROXY_SPECS);
         customValidate(validationContext, results);
         return results;
@@ -96,30 +93,10 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
                         .setMaxAttempts(retryCount)
                         .build());
 
-        final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context, () -> {
-            final String proxyHost = context.getProperty(PROXY_HOST).getValue();
-            final Integer proxyPort = context.getProperty(PROXY_PORT).asInteger();
-            if (proxyHost != null && proxyPort != null && proxyPort > 0) {
-                final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
-                final String proxyUser = context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue();
-                final String proxyPassword = context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
-                componentProxyConfig.setProxyType(Proxy.Type.HTTP);
-                componentProxyConfig.setProxyServerHost(proxyHost);
-                componentProxyConfig.setProxyServerPort(proxyPort);
-                componentProxyConfig.setProxyUserName(proxyUser);
-                componentProxyConfig.setProxyUserPassword(proxyPassword);
-                return componentProxyConfig;
-            }
-            return ProxyConfiguration.DIRECT_CONFIGURATION;
-        });
-
         if (!projectId.isEmpty()) {
             storageOptionsBuilder.setProjectId(projectId);
         }
 
-        final ProxyAwareTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
-        storageOptionsBuilder.setTransportOptions(HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build());
-
-        return  storageOptionsBuilder.build();
+        return  storageOptionsBuilder.setTransportOptions(getTransportOptions(context)).build();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
index 658350e..98aa1ee 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
@@ -33,7 +33,7 @@
         <dependency>
             <groupId>com.google.auth</groupId>
             <artifactId>google-auth-library-oauth2-http</artifactId>
-            <version>0.9.0</version>
+            <version>0.12.0</version>
             <exclusions>
                 <exclusion>
                     <groupId>com.google.code.findbugs</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
index bb459a8..c1300a4 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
@@ -27,14 +27,15 @@
     <packaging>pom</packaging>
 
     <properties>
-        <google.cloud.sdk.version>0.47.0-alpha</google.cloud.sdk.version>
+        <google.cloud.sdk.version>0.71.0-alpha</google.cloud.sdk.version>
     </properties>
 
     <dependencyManagement>
         <dependencies>
             <dependency>
+            	<!-- https://github.com/GoogleCloudPlatform/google-cloud-java/tree/master/google-cloud-bom -->
                 <groupId>com.google.cloud</groupId>
-                <artifactId>google-cloud</artifactId>
+                <artifactId>google-cloud-bom</artifactId>
                 <version>${google.cloud.sdk.version}</version>
                 <type>pom</type>
                 <scope>import</scope>