You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2018/11/21 15:46:17 UTC
[1/2] nifi git commit: NIFI-4731: BQ Processors and GCP library
update.
Repository: nifi
Updated Branches:
refs/heads/master f6b171d5f -> 03ef64654
NIFI-4731: BQ Processors and GCP library update.
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/444caf8a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/444caf8a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/444caf8a
Branch: refs/heads/master
Commit: 444caf8a788c8c0a21c45558bb16cd8ee4b42872
Parents: f6b171d
Author: Daniel Jimenez <dj...@synack.com>
Authored: Sun Apr 29 10:01:34 2018 +0200
Committer: joewitt <jo...@apache.org>
Committed: Wed Nov 21 10:27:41 2018 -0500
----------------------------------------------------------------------
.../nifi-gcp-bundle/nifi-gcp-processors/pom.xml | 10 +-
.../processors/gcp/AbstractGCPProcessor.java | 3 +-
.../gcp/bigquery/AbstractBigQueryProcessor.java | 122 +++++++++
.../gcp/bigquery/BigQueryAttributes.java | 80 ++++++
.../nifi/processors/gcp/bigquery/BqUtils.java | 84 ++++++
.../gcp/bigquery/PutBigQueryBatch.java | 269 +++++++++++++++++++
.../gcp/storage/AbstractGCSProcessor.java | 36 +--
.../processors/gcp/storage/ListGCSBucket.java | 11 +-
.../org.apache.nifi.processor.Processor | 3 +-
.../processors/gcp/bigquery/AbstractBQTest.java | 96 +++++++
.../gcp/bigquery/AbstractBigQueryIT.java | 79 ++++++
.../gcp/bigquery/PutBigQueryBatchIT.java | 137 ++++++++++
.../gcp/bigquery/PutBigQueryBatchTest.java | 153 +++++++++++
.../gcp/storage/ListGCSBucketTest.java | 1 -
.../nifi-gcp-services-api/pom.xml | 1 +
15 files changed, 1060 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 669cbc6..24c60bf 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -62,7 +62,7 @@
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
- <artifactId>google-cloud-storage</artifactId>
+ <artifactId>google-cloud-core</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
@@ -72,6 +72,14 @@
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
+ <artifactId>google-cloud-storage</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.cloud</groupId>
+ <artifactId>google-cloud-bigquery</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
index 2c178ff..e95bf73 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
@@ -43,7 +43,8 @@ public abstract class AbstractGCPProcessor<
.Builder().name("gcp-project-id")
.displayName("Project ID")
.description("Google Cloud Project ID")
- .required(true)
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
new file mode 100644
index 0000000..b52a552
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.common.collect.ImmutableList;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Base class for creating processors that connect to GCP BiqQuery service
+ */
+public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<BigQuery, BigQueryOptions> {
+ static final int BUFFER_SIZE = 65536;
+ public static final Relationship REL_SUCCESS =
+ new Relationship.Builder().name("success")
+ .description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.")
+ .build();
+ public static final Relationship REL_FAILURE =
+ new Relationship.Builder().name("failure")
+ .description("FlowFiles are routed to this relationship if the Google BigQuery operation fails.")
+ .build();
+
+ public static final Set<Relationship> relationships = Collections.unmodifiableSet(
+ new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+ public static final PropertyDescriptor DATASET = new PropertyDescriptor
+ .Builder().name(BigQueryAttributes.DATASET_ATTR)
+ .displayName("Dataset")
+ .description(BigQueryAttributes.DATASET_DESC)
+ .required(true)
+ .defaultValue("${" + BigQueryAttributes.DATASET_ATTR + "}")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor
+ .Builder().name(BigQueryAttributes.TABLE_NAME_ATTR)
+ .displayName("Table Name")
+ .description(BigQueryAttributes.TABLE_NAME_DESC)
+ .required(true)
+ .defaultValue("${" + BigQueryAttributes.TABLE_NAME_ATTR + "}")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor
+ .Builder().name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
+ .displayName("Table Schema")
+ .description(BigQueryAttributes.TABLE_SCHEMA_DESC)
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor
+ .Builder().name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
+ .displayName("Read Timeout")
+ .description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
+ .required(true)
+ .defaultValue("5 minutes")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return ImmutableList.<PropertyDescriptor>builder()
+ .addAll(super.getSupportedPropertyDescriptors())
+ .build();
+ }
+
+ @Override
+ protected BigQueryOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
+ final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
+ final Integer retryCount = Integer.valueOf(context.getProperty(RETRY_COUNT).getValue());
+
+ BigQueryOptions.Builder builder = BigQueryOptions.newBuilder().setCredentials(credentials);
+
+ if (!StringUtils.isBlank(projectId)) {
+ builder.setProjectId(projectId);
+ }
+
+ return builder
+ .setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(retryCount).build())
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
new file mode 100644
index 0000000..380f701
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
+
+/**
+ * Attributes associated with the BigQuery processors
+ */
+public class BigQueryAttributes {
+ private BigQueryAttributes() {}
+
+ public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
+
+ public static final String DATASET_ATTR = "bq.dataset";
+ public static final String DATASET_DESC = "BigQuery dataset";
+
+ public static final String TABLE_NAME_ATTR = "bq.table.name";
+ public static final String TABLE_NAME_DESC = "BigQuery table name";
+
+ public static final String TABLE_SCHEMA_ATTR = "bq.table.schema";
+ public static final String TABLE_SCHEMA_DESC = "BigQuery table name";
+
+ public static final String CREATE_DISPOSITION_ATTR = "bq.load.create_disposition";
+ public static final String CREATE_DISPOSITION_DESC = "Options for table creation";
+
+ public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
+ public static final String JOB_ERROR_MSG_DESC = "Load job error message";
+
+ public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
+ public static final String JOB_ERROR_REASON_DESC = "Load job error reason";
+
+ public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
+ public static final String JOB_ERROR_LOCATION_DESC = "Load job error location";
+
+ public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout";
+ public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out";
+
+
+ // Batch Attributes
+ public static final String SOURCE_TYPE_ATTR = "bq.load.type";
+ public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded";
+
+ public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown";
+ public static final String IGNORE_UNKNOWN_DESC = "Ignore fields not in table schema";
+
+ public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition";
+ public static final String WRITE_DISPOSITION_DESC = "Options for writing to table";
+
+ public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords";
+ public static final String MAX_BADRECORDS_DESC = "Number of erroneous records to ignore before generating an error";
+
+ public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time";
+ public static final String JOB_CREATE_TIME_DESC = "Time load job creation";
+
+ public static final String JOB_END_TIME_ATTR = "bq.job.stat.end_time";
+ public static final String JOB_END_TIME_DESC = "Time load job ended";
+
+ public static final String JOB_START_TIME_ATTR = "bq.job.stat.start_time";
+ public static final String JOB_START_TIME_DESC = "Time load job started";
+
+ public static final String JOB_LINK_ATTR = "bq.job.link";
+ public static final String JOB_LINK_DESC = "API Link to load job";
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
new file mode 100644
index 0000000..f7f5d66
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.LegacySQLTypeName;
+import com.google.cloud.bigquery.Schema;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ *
+ */
+public class BqUtils {
+ private final static Type gsonSchemaType = new TypeToken<List<Map>>() {
+ }.getType();
+
+ public static Field mapToField(Map fMap) {
+ String typeStr = fMap.get("type").toString();
+ String nameStr = fMap.get("name").toString();
+ String modeStr = fMap.get("mode").toString();
+ LegacySQLTypeName type = null;
+
+ if (typeStr.equals("BOOLEAN")) {
+ type = LegacySQLTypeName.BOOLEAN;
+ } else if (typeStr.equals("STRING")) {
+ type = LegacySQLTypeName.STRING;
+ } else if (typeStr.equals("BYTES")) {
+ type = LegacySQLTypeName.BYTES;
+ } else if (typeStr.equals("INTEGER")) {
+ type = LegacySQLTypeName.INTEGER;
+ } else if (typeStr.equals("FLOAT")) {
+ type = LegacySQLTypeName.FLOAT;
+ } else if (typeStr.equals("TIMESTAMP") || typeStr.equals("DATE")
+ || typeStr.equals("TIME") || typeStr.equals("DATETIME")) {
+ type = LegacySQLTypeName.TIMESTAMP;
+ } else if (typeStr.equals("RECORD")) {
+ type = LegacySQLTypeName.RECORD;
+ }
+
+ return Field.newBuilder(nameStr, type).setMode(Field.Mode.valueOf(modeStr)).build();
+ }
+
+ public static List<Field> listToFields(List<Map> m_fields) {
+ List<Field> fields = new ArrayList(m_fields.size());
+ for (Map m : m_fields) {
+ fields.add(mapToField(m));
+ }
+
+ return fields;
+ }
+
+ public static Schema schemaFromString(String schemaStr) {
+ if (schemaStr == null) {
+ return null;
+ } else {
+ Gson gson = new Gson();
+ List<Map> fields = gson.fromJson(schemaStr, gsonSchemaType);
+ return Schema.of(BqUtils.listToFields(fields));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
new file mode 100644
index 0000000..99c7f2a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.cloud.RetryOption;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.Job;
+import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.TableDataWriteChannel;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.WriteChannelConfiguration;
+import com.google.common.collect.ImmutableList;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
+import org.apache.nifi.processors.gcp.storage.PutGCSObject;
+import org.apache.nifi.util.StringUtils;
+import org.threeten.bp.Duration;
+import org.threeten.bp.temporal.ChronoUnit;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A processor for batch loading data into a Google BigQuery table
+ */
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"google", "google cloud", "bq", "bigquery"})
+@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
+@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
+
+@WritesAttributes({
+ @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
+})
+
+public class PutBigQueryBatch extends AbstractBigQueryProcessor {
+
+ public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
+ .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
+ .displayName("Load file type")
+ .description(BigQueryAttributes.SOURCE_TYPE_DESC)
+ .required(true)
+ .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
+ .defaultValue(FormatOptions.avro().getType())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
+ .displayName("Ignore Unknown Values")
+ .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
+ .displayName("Create Disposition")
+ .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
+ .required(true)
+ .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
+ .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
+ .displayName("Write Disposition")
+ .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
+ .required(true)
+ .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
+ .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
+ .displayName("Max Bad Records")
+ .description(BigQueryAttributes.MAX_BADRECORDS_DESC)
+ .required(true)
+ .defaultValue("0")
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+
+ private Schema schemaCache = null;
+
+ public PutBigQueryBatch() {
+
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return ImmutableList.<PropertyDescriptor>builder()
+ .addAll(super.getSupportedPropertyDescriptors())
+ .add(DATASET)
+ .add(TABLE_NAME)
+ .add(TABLE_SCHEMA)
+ .add(SOURCE_TYPE)
+ .add(CREATE_DISPOSITION)
+ .add(WRITE_DISPOSITION)
+ .add(MAXBAD_RECORDS)
+ .add(IGNORE_UNKNOWN)
+ .build();
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .dynamic(true)
+ .build();
+ }
+
+ @Override
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ super.onScheduled(context);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ final BigQuery bq = getCloudService();
+
+ final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
+ final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+ final TableId tableId;
+ if (StringUtils.isEmpty(projectId)) {
+ tableId = TableId.of(dataset, tableName);
+ } else {
+ tableId = TableId.of(projectId, dataset, tableName);
+ }
+
+ final String fileType = context.getProperty(SOURCE_TYPE).getValue();
+
+ String schemaString = context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue();
+ Schema schema = BqUtils.schemaFromString(schemaString);
+
+ WriteChannelConfiguration writeChannelConfiguration =
+ WriteChannelConfiguration.newBuilder(tableId)
+ .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
+ .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
+ .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
+ .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
+ .setSchema(schema)
+ .setFormatOptions(FormatOptions.of(fileType))
+ .build();
+
+ TableDataWriteChannel writer = bq.writer(writeChannelConfiguration);
+
+ try {
+ session.read(flowFile, rawIn -> {
+ ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
+ while (readableByteChannel.read(byteBuffer) >= 0) {
+ byteBuffer.flip();
+ writer.write(byteBuffer);
+ byteBuffer.clear();
+ }
+ });
+
+ writer.close();
+
+ Job job = writer.getJob();
+ PropertyValue property = context.getProperty(READ_TIMEOUT);
+ Long timePeriod = property.evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
+ Duration duration = Duration.of(timePeriod, ChronoUnit.SECONDS);
+ job = job.waitFor(RetryOption.totalTimeout(duration));
+
+ if (job != null) {
+ attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime()));
+ attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime()));
+ attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime()));
+ attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink());
+
+ boolean jobError = (job.getStatus().getError() != null);
+
+ if (jobError) {
+ attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage());
+ attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason());
+ attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation());
+ } else {
+ // in case it got looped back from error
+ flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR);
+ flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR);
+ flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
+ }
+
+ if (!attributes.isEmpty()) {
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ }
+
+ if (jobError) {
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ } else {
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+ }
+
+ } catch (Exception ex) {
+ getLogger().log(LogLevel.ERROR, ex.getMessage(), ex);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
index 4363115..7c63631 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
@@ -16,12 +16,15 @@
*/
package org.apache.nifi.processors.gcp.storage;
-import com.google.api.gax.retrying.RetrySettings;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.http.HttpTransportOptions;
-import com.google.cloud.storage.Storage;
-import com.google.cloud.storage.StorageOptions;
-import com.google.common.collect.ImmutableList;
+import java.net.Proxy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -31,14 +34,12 @@ import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.proxy.ProxyConfiguration;
-import java.net.Proxy;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.http.HttpTransportOptions;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import com.google.common.collect.ImmutableList;
/**
* Base class for creating processors which connect to Google Cloud Storage.
@@ -86,12 +87,11 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
@Override
protected StorageOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
- final String projectId = context.getProperty(PROJECT_ID).getValue();
+ final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
final Integer retryCount = context.getProperty(RETRY_COUNT).asInteger();
StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder()
.setCredentials(credentials)
- .setProjectId(projectId)
.setRetrySettings(RetrySettings.newBuilder()
.setMaxAttempts(retryCount)
.build());
@@ -113,6 +113,10 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
return ProxyConfiguration.DIRECT_CONFIGURATION;
});
+ if (!projectId.isEmpty()) {
+ storageOptionsBuilder.setProjectId(projectId);
+ }
+
final ProxyAwareTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
storageOptionsBuilder.setTransportOptions(HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build());
http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index e018814..01293cf 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -150,8 +150,8 @@ public class ListGCSBucket extends AbstractGCSProcessor {
.displayName("Bucket")
.description(BUCKET_DESC)
.required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder()
@@ -159,7 +159,8 @@ public class ListGCSBucket extends AbstractGCSProcessor {
.displayName("Prefix")
.description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
.required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor USE_GENERATIONS = new PropertyDescriptor.Builder()
@@ -242,9 +243,9 @@ public class ListGCSBucket extends AbstractGCSProcessor {
final long startNanos = System.nanoTime();
- final String bucket = context.getProperty(BUCKET).getValue();
+ final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
- final String prefix = context.getProperty(PREFIX).getValue();
+ final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
final boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean();
http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 249d19e..4ff9dfb 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -17,4 +17,5 @@ org.apache.nifi.processors.gcp.storage.FetchGCSObject
org.apache.nifi.processors.gcp.storage.DeleteGCSObject
org.apache.nifi.processors.gcp.storage.ListGCSBucket
org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub
-org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
\ No newline at end of file
+org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
+org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java
new file mode 100644
index 0000000..e424a3a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.bigquery;
+
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+
+/**
+ * Base class for BigQuery Unit Tests. Provides a framework for creating a TestRunner instance with always-required credentials.
+ */
+public abstract class AbstractBQTest {
+ private static final String PROJECT_ID = System.getProperty("test.gcp.project.id", "nifi-test-gcp-project");
+ private static final Integer RETRIES = 9;
+
+ static final String DATASET = RemoteBigQueryHelper.generateDatasetName();
+
+ @Before
+ public void setup() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ public static TestRunner buildNewRunner(Processor processor) throws Exception {
+ final GCPCredentialsService credentialsService = new GCPCredentialsControllerService();
+
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.addControllerService("gcpCredentialsControllerService", credentialsService);
+ runner.enableControllerService(credentialsService);
+
+ runner.setProperty(AbstractBigQueryProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, "gcpCredentialsControllerService");
+ runner.setProperty(AbstractBigQueryProcessor.PROJECT_ID, PROJECT_ID);
+ runner.setProperty(AbstractBigQueryProcessor.RETRY_COUNT, String.valueOf(RETRIES));
+
+ runner.assertValid(credentialsService);
+
+ return runner;
+ }
+
+ public abstract AbstractBigQueryProcessor getProcessor();
+
+ protected abstract void addRequiredPropertiesToRunner(TestRunner runner);
+
+ @Mock
+ protected BigQuery bq;
+
+ @Test
+ public void testBiqQueryOptionsConfiguration() throws Exception {
+ reset(bq);
+ final TestRunner runner = buildNewRunner(getProcessor());
+
+ final AbstractBigQueryProcessor processor = getProcessor();
+ final GoogleCredentials mockCredentials = mock(GoogleCredentials.class);
+
+ final BigQueryOptions options = processor.getServiceOptions(runner.getProcessContext(),
+ mockCredentials);
+
+ assertEquals("Project IDs should match",
+ PROJECT_ID, options.getProjectId());
+
+ assertEquals("Retry counts should match",
+ RETRIES.intValue(), options.getRetrySettings().getMaxAttempts());
+
+ assertSame("Credentials should be configured correctly",
+ mockCredentials, options.getCredentials());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
new file mode 100644
index 0000000..52327d4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.Dataset;
+import com.google.cloud.bigquery.DatasetInfo;
+import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processors.gcp.GCPIntegrationTests;
+import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertNull;
+
+@Category(GCPIntegrationTests.class)
+public abstract class AbstractBigQueryIT {
+
+ static final String CONTROLLER_SERVICE = "GCPCredentialsService";
+ protected static BigQuery bigquery;
+ protected static Dataset dataset;
+ protected static TestRunner runner;
+
+ @BeforeClass
+ public static void beforeClass() {
+ dataset = null;
+ BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder()
+ .build();
+ bigquery = bigQueryOptions.getService();
+
+ DatasetInfo datasetInfo = DatasetInfo.newBuilder(RemoteBigQueryHelper.generateDatasetName()).build();
+ dataset = bigquery.create(datasetInfo);
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ bigquery.delete(dataset.getDatasetId(), BigQuery.DatasetDeleteOption.deleteContents());
+ }
+
+ protected static void validateNoServiceExceptionAttribute(FlowFile flowFile) {
+ assertNull(flowFile.getAttribute(BigQueryAttributes.JOB_ERROR_MSG_ATTR));
+ assertNull(flowFile.getAttribute(BigQueryAttributes.JOB_ERROR_REASON_ATTR));
+ assertNull(flowFile.getAttribute(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR));
+ }
+
+ protected TestRunner setCredentialsControllerService(TestRunner runner) throws InitializationException {
+ final Map<String, String> propertiesMap = new HashMap<>();
+ final GCPCredentialsControllerService credentialsControllerService = new GCPCredentialsControllerService();
+
+ runner.addControllerService(CONTROLLER_SERVICE, credentialsControllerService, propertiesMap);
+ runner.enableControllerService(credentialsControllerService);
+ runner.assertValid(credentialsControllerService);
+
+ return runner;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
new file mode 100644
index 0000000..8686213
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.cloud.bigquery.FormatOptions;
+import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class PutBigQueryBatchIT extends AbstractBigQueryIT {
+
+ private static final String TABLE_SCHEMA_STRING = "[\n" +
+ " {\n" +
+ " \"description\": \"field 1\",\n" +
+ " \"mode\": \"REQUIRED\",\n" +
+ " \"name\": \"field_1\",\n" +
+ " \"type\": \"STRING\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"description\": \"field 2\",\n" +
+ " \"mode\": \"REQUIRED\",\n" +
+ " \"name\": \"field_2\",\n" +
+ " \"type\": \"STRING\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"description\": \"field 3\",\n" +
+ " \"mode\": \"NULLABLE\",\n" +
+ " \"name\": \"field_3\",\n" +
+ " \"type\": \"STRING\"\n" +
+ " }\n" +
+ "]";
+
+ @Before
+ public void setup() {
+ runner = TestRunners.newTestRunner(PutBigQueryBatch.class);
+ }
+
+ @Test
+ public void PutBigQueryBatchSmallPayloadTest() throws Exception {
+ String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
+ runner = setCredentialsControllerService(runner);
+ runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
+ runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
+ runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName);
+ runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, FormatOptions.json().getType());
+ runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, TABLE_SCHEMA_STRING);
+
+ String str = "{\"field_1\":\"Daniel is great\",\"field_2\":\"Daniel is great\"}\r\n";
+
+ runner.enqueue(new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)));
+ runner.run(1);
+ for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(AbstractBigQueryProcessor.REL_SUCCESS)) {
+ validateNoServiceExceptionAttribute(flowFile);
+ }
+ runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_SUCCESS, 1);
+ }
+
+ @Test
+ public void PutBigQueryBatchBadRecordTest() throws Exception {
+ String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
+ runner = setCredentialsControllerService(runner);
+ runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
+ runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
+ runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName);
+ runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, FormatOptions.json().getType());
+ runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, TABLE_SCHEMA_STRING);
+
+ String str = "{\"field_1\":\"Daniel is great\"}\r\n";
+
+ runner.enqueue(new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)));
+ runner.run(1);
+ runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_FAILURE, 1);
+ }
+
+ @Test
+ public void PutBigQueryBatchLargePayloadTest() throws InitializationException, IOException {
+ String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
+ runner = setCredentialsControllerService(runner);
+ runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
+ runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
+ runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName);
+ runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, FormatOptions.json().getType());
+ runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, TABLE_SCHEMA_STRING);
+
+ // Allow one bad record to deal with the extra line break.
+ runner.setProperty(BigQueryAttributes.MAX_BADRECORDS_ATTR, String.valueOf(1));
+
+ String str = "{\"field_1\":\"Daniel is great\",\"field_2\":\"Here's to the crazy ones. The misfits. The rebels. The troublemakers." +
+ " The round pegs in the square holes. The ones who see things differently. They're not fond of rules. And they have no respect" +
+ " for the status quo. You can quote them, disagree with them, glorify or vilify them. About the only thing you can't do is ignore" +
+ " them. Because they change things. They push the human race forward. And while some may see them as the crazy ones, we see genius." +
+ " Because the people who are crazy enough to think they can change the world, are the ones who do.\"}\n";
+ Path tempFile = Files.createTempFile(methodName, "");
+ try (BufferedWriter writer = Files.newBufferedWriter(tempFile)) {
+
+ for (int i = 0; i < 2; i++) {
+ for (int ii = 0; ii < 1_000_000; ii++) {
+ writer.write(str);
+ }
+ writer.flush();
+ }
+ writer.flush();
+ }
+
+ runner.enqueue(tempFile);
+ runner.run(1);
+ for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(AbstractBigQueryProcessor.REL_SUCCESS)) {
+ validateNoServiceExceptionAttribute(flowFile);
+ }
+ runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_SUCCESS, 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
new file mode 100644
index 0000000..7ec5aa9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.cloud.RetryOption;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryException;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.Job;
+import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.JobStatistics;
+import com.google.cloud.bigquery.JobStatus;
+import com.google.cloud.bigquery.Table;
+import com.google.cloud.bigquery.TableDataWriteChannel;
+import com.google.cloud.bigquery.WriteChannelConfiguration;
+import org.apache.nifi.util.TestRunner;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link PutBigQueryBatch}.
+ */
+public class PutBigQueryBatchTest extends AbstractBQTest {
+ private static final String TABLENAME = "test_table";
+ private static final String TABLE_SCHEMA = "[{ \"mode\": \"NULLABLE\", \"name\": \"data\", \"type\": \"STRING\" }]";
+ private static final String SOURCE_TYPE = FormatOptions.json().getType();
+ private static final String CREATE_DISPOSITION = JobInfo.CreateDisposition.CREATE_IF_NEEDED.name();
+ private static final String WRITE_DISPOSITION = JobInfo.WriteDisposition.WRITE_EMPTY.name();
+ private static final String MAXBAD_RECORDS = "0";
+ private static final String IGNORE_UNKNOWN = "true";
+ private static final String READ_TIMEOUT = "5 minutes";
+
+ @Mock
+ BigQuery bq;
+
+ @Mock
+ Table table;
+
+ @Mock
+ Job job;
+
+ @Mock
+ JobStatus jobStatus;
+
+ @Mock
+ JobStatistics stats;
+
+ @Mock
+ TableDataWriteChannel tableDataWriteChannel;
+
+ @Before
+ public void setup() throws Exception {
+ super.setup();
+ reset(bq);
+ reset(table);
+ reset(job);
+ reset(jobStatus);
+ reset(stats);
+ }
+
+ @Override
+ public AbstractBigQueryProcessor getProcessor() {
+ return new PutBigQueryBatch() {
+ @Override
+ protected BigQuery getCloudService() {
+ return bq;
+ }
+ };
+ }
+
+ @Override
+ protected void addRequiredPropertiesToRunner(TestRunner runner) {
+ runner.setProperty(PutBigQueryBatch.DATASET, DATASET);
+ runner.setProperty(PutBigQueryBatch.TABLE_NAME, TABLENAME);
+ runner.setProperty(PutBigQueryBatch.TABLE_SCHEMA, TABLE_SCHEMA);
+ runner.setProperty(PutBigQueryBatch.SOURCE_TYPE, SOURCE_TYPE);
+ runner.setProperty(PutBigQueryBatch.CREATE_DISPOSITION, CREATE_DISPOSITION);
+ runner.setProperty(PutBigQueryBatch.WRITE_DISPOSITION, WRITE_DISPOSITION);
+ runner.setProperty(PutBigQueryBatch.MAXBAD_RECORDS, MAXBAD_RECORDS);
+ runner.setProperty(PutBigQueryBatch.IGNORE_UNKNOWN, IGNORE_UNKNOWN);
+ runner.setProperty(PutBigQueryBatch.READ_TIMEOUT, READ_TIMEOUT);
+ }
+
+ @Test
+ public void testSuccessfulLoad() throws Exception {
+ when(table.exists()).thenReturn(Boolean.TRUE);
+ when(bq.create(ArgumentMatchers.isA(JobInfo.class))).thenReturn(job);
+ when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel);
+ when(tableDataWriteChannel.getJob()).thenReturn(job);
+ when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenReturn(job);
+ when(job.getStatus()).thenReturn(jobStatus);
+ when(job.getStatistics()).thenReturn(stats);
+
+ when(stats.getCreationTime()).thenReturn(0L);
+ when(stats.getStartTime()).thenReturn(1L);
+ when(stats.getEndTime()).thenReturn(2L);
+
+ final TestRunner runner = buildNewRunner(getProcessor());
+ addRequiredPropertiesToRunner(runner);
+ runner.assertValid();
+
+ runner.enqueue("{ \"data\": \"datavalue\" }");
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_SUCCESS);
+ }
+
+
+ @Test
+ public void testFailedLoad() throws Exception {
+ when(table.exists()).thenReturn(Boolean.TRUE);
+ when(bq.create(ArgumentMatchers.isA(JobInfo.class))).thenReturn(job);
+ when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel);
+ when(tableDataWriteChannel.getJob()).thenReturn(job);
+ when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenThrow(BigQueryException.class);
+ when(job.getStatus()).thenReturn(jobStatus);
+ when(job.getStatistics()).thenReturn(stats);
+
+ when(stats.getCreationTime()).thenReturn(0L);
+ when(stats.getStartTime()).thenReturn(1L);
+ when(stats.getEndTime()).thenReturn(2L);
+
+ final TestRunner runner = buildNewRunner(getProcessor());
+ addRequiredPropertiesToRunner(runner);
+ runner.assertValid();
+
+ runner.enqueue("{ \"data\": \"datavalue\" }");
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_FAILURE);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
index eca81bb..e17cf4b 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.processors.gcp.storage;
-
import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Acl;
import com.google.cloud.storage.Blob;
http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
index b8bd9f7..658350e 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
@@ -33,6 +33,7 @@
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
+ <version>0.9.0</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
[2/2] nifi git commit: NIFI-4731 This closes #3019. This closes
#2682. This closes #2420. NIFI-4933 BigQuery PR Review
Posted by jo...@apache.org.
NIFI-4731 This closes #3019. This closes #2682. This closes #2420.
NIFI-4933 BigQuery PR Review
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/03ef6465
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/03ef6465
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/03ef6465
Branch: refs/heads/master
Commit: 03ef6465478eea09704a2b6eb9be16f5001007b9
Parents: 444caf8
Author: Pierre Villard <pi...@gmail.com>
Authored: Tue Sep 18 23:19:03 2018 +0200
Committer: joewitt <jo...@apache.org>
Committed: Wed Nov 21 10:28:40 2018 -0500
----------------------------------------------------------------------
.../processors/gcp/AbstractGCPProcessor.java | 47 ++-
.../gcp/bigquery/AbstractBigQueryProcessor.java | 62 +++-
.../gcp/bigquery/BigQueryAttributes.java | 101 ++++--
.../processors/gcp/bigquery/BigQueryUtils.java | 84 +++++
.../nifi/processors/gcp/bigquery/BqUtils.java | 84 -----
.../gcp/bigquery/PutBigQueryBatch.java | 308 ++++++++++++-------
.../gcp/pubsub/AbstractGCPubSubProcessor.java | 21 ++
.../gcp/storage/AbstractGCSProcessor.java | 27 +-
.../nifi-gcp-services-api/pom.xml | 2 +-
nifi-nar-bundles/nifi-gcp-bundle/pom.xml | 5 +-
10 files changed, 479 insertions(+), 262 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
index e95bf73..0c360d1 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
@@ -19,16 +19,20 @@ package org.apache.nifi.processors.gcp;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.Service;
import com.google.cloud.ServiceOptions;
+import com.google.cloud.TransportOptions;
+import com.google.cloud.http.HttpTransportOptions;
import com.google.common.collect.ImmutableList;
+
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.proxy.ProxyConfiguration;
+import java.net.Proxy;
import java.util.List;
/**
@@ -65,7 +69,7 @@ public abstract class AbstractGCPProcessor<
"-Djdk.http.auth.tunneling.disabledSchemes=\n" +
"-Djdk.http.auth.proxying.disabledSchemes=")
.required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -74,14 +78,14 @@ public abstract class AbstractGCPProcessor<
.displayName("Proxy port")
.description("Proxy port number")
.required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor
.Builder().name("gcp-proxy-user-name")
- .displayName("Http Proxy Username")
- .description("Http Proxy Username")
+ .displayName("HTTP Proxy Username")
+ .description("HTTP Proxy Username")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
@@ -89,8 +93,8 @@ public abstract class AbstractGCPProcessor<
public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor
.Builder().name("gcp-proxy-user-password")
- .displayName("Http Proxy Password")
- .description("Http Proxy Password")
+ .displayName("HTTP Proxy Password")
+ .description("HTTP Proxy Password")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
@@ -160,4 +164,31 @@ public abstract class AbstractGCPProcessor<
* @see <a href="http://googlecloudplatform.github.io/google-cloud-java/0.8.0/apidocs/com/google/cloud/ServiceOptions.html">ServiceOptions</a>
*/
protected abstract CloudServiceOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials);
+
+ /**
+ * Builds the Transport Options containing the proxy configuration
+ * @param context Context to get properties
+ * @return Transport options object with proxy configuration
+ */
+ protected TransportOptions getTransportOptions(ProcessContext context) {
+ final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context, () -> {
+ final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
+ final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
+ if (proxyHost != null && proxyPort != null && proxyPort > 0) {
+ final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
+ final String proxyUser = context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue();
+ final String proxyPassword = context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
+ componentProxyConfig.setProxyType(Proxy.Type.HTTP);
+ componentProxyConfig.setProxyServerHost(proxyHost);
+ componentProxyConfig.setProxyServerPort(proxyPort);
+ componentProxyConfig.setProxyUserName(proxyUser);
+ componentProxyConfig.setProxyUserPassword(proxyPassword);
+ return componentProxyConfig;
+ }
+ return ProxyConfiguration.DIRECT_CONFIGURATION;
+ });
+
+ final ProxyAwareTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
+ return HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build();
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
index b52a552..c249e7e 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
@@ -22,15 +22,21 @@ import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.common.collect.ImmutableList;
+
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.util.StringUtils;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -40,7 +46,9 @@ import java.util.Set;
* Base class for creating processors that connect to GCP BiqQuery service
*/
public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<BigQuery, BigQueryOptions> {
+
static final int BUFFER_SIZE = 65536;
+
public static final Relationship REL_SUCCESS =
new Relationship.Builder().name("success")
.description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.")
@@ -53,8 +61,8 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
- public static final PropertyDescriptor DATASET = new PropertyDescriptor
- .Builder().name(BigQueryAttributes.DATASET_ATTR)
+ public static final PropertyDescriptor DATASET = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.DATASET_ATTR)
.displayName("Dataset")
.description(BigQueryAttributes.DATASET_DESC)
.required(true)
@@ -63,8 +71,8 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
- public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor
- .Builder().name(BigQueryAttributes.TABLE_NAME_ATTR)
+ public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.TABLE_NAME_ATTR)
.displayName("Table Name")
.description(BigQueryAttributes.TABLE_NAME_DESC)
.required(true)
@@ -73,22 +81,22 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
- public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor
- .Builder().name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
+ public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
.displayName("Table Schema")
.description(BigQueryAttributes.TABLE_SCHEMA_DESC)
.required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
- public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor
- .Builder().name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
+ public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
.displayName("Read Timeout")
.description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
.required(true)
.defaultValue("5 minutes")
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
@@ -101,6 +109,10 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ImmutableList.<PropertyDescriptor>builder()
.addAll(super.getSupportedPropertyDescriptors())
+ .add(DATASET)
+ .add(TABLE_NAME)
+ .add(TABLE_SCHEMA)
+ .add(READ_TIMEOUT)
.build();
}
@@ -109,14 +121,40 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
final Integer retryCount = Integer.valueOf(context.getProperty(RETRY_COUNT).getValue());
- BigQueryOptions.Builder builder = BigQueryOptions.newBuilder().setCredentials(credentials);
+ final BigQueryOptions.Builder builder = BigQueryOptions.newBuilder();
if (!StringUtils.isBlank(projectId)) {
builder.setProjectId(projectId);
}
- return builder
+ return builder.setCredentials(credentials)
.setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(retryCount).build())
+ .setTransportOptions(getTransportOptions(context))
.build();
}
+
+ @Override
+ protected final Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ final Collection<ValidationResult> results = super.customValidate(validationContext);
+ ProxyConfiguration.validateProxySpec(validationContext, results, ProxyAwareTransportFactory.PROXY_SPECS);
+
+ final boolean projectId = validationContext.getProperty(PROJECT_ID).isSet();
+ if (!projectId) {
+ results.add(new ValidationResult.Builder()
+ .subject(PROJECT_ID.getName())
+ .valid(false)
+ .explanation("The Project ID must be set for this processor.")
+ .build());
+ }
+
+ customValidate(validationContext, results);
+ return results;
+ }
+
+ /**
+ * If sub-classes needs to implement any custom validation, override this method then add validation result to the results.
+ */
+ protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
index 380f701..842a176 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
@@ -17,9 +17,12 @@
package org.apache.nifi.processors.gcp.bigquery;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
+import com.google.cloud.bigquery.JobInfo;
+
/**
* Attributes associated with the BigQuery processors
*/
@@ -28,44 +31,75 @@ public class BigQueryAttributes {
public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
+ // Properties
+ public static final String SOURCE_TYPE_ATTR = "bq.load.type";
+ public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded. Possible values: AVRO, "
+ + "NEWLINE_DELIMITED_JSON, CSV.";
+
+ public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown";
+ public static final String IGNORE_UNKNOWN_DESC = "Sets whether BigQuery should allow extra values that are not represented "
+ + "in the table schema. If true, the extra values are ignored. If false, records with extra columns are treated as "
+ + "bad records, and if there are too many bad records, an invalid error is returned in the job result. By default "
+ + "unknown values are not allowed.";
+
+ public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition";
+ public static final String WRITE_DISPOSITION_DESC = "Sets the action that should occur if the destination table already exists.";
+
+ public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords";
+ public static final String MAX_BADRECORDS_DESC = "Sets the maximum number of bad records that BigQuery can ignore when running "
+ + "the job. If the number of bad records exceeds this value, an invalid error is returned in the job result. By default "
+ + "no bad record is ignored.";
+
public static final String DATASET_ATTR = "bq.dataset";
- public static final String DATASET_DESC = "BigQuery dataset";
+ public static final String DATASET_DESC = "BigQuery dataset name (Note - The dataset must exist in GCP)";
public static final String TABLE_NAME_ATTR = "bq.table.name";
public static final String TABLE_NAME_DESC = "BigQuery table name";
public static final String TABLE_SCHEMA_ATTR = "bq.table.schema";
- public static final String TABLE_SCHEMA_DESC = "BigQuery table name";
+ public static final String TABLE_SCHEMA_DESC = "BigQuery schema in JSON format";
public static final String CREATE_DISPOSITION_ATTR = "bq.load.create_disposition";
- public static final String CREATE_DISPOSITION_DESC = "Options for table creation";
+ public static final String CREATE_DISPOSITION_DESC = "Sets whether the job is allowed to create new tables";
- public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
- public static final String JOB_ERROR_MSG_DESC = "Load job error message";
+ public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout";
+ public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out";
- public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
- public static final String JOB_ERROR_REASON_DESC = "Load job error reason";
+ public static final String CSV_ALLOW_JAGGED_ROWS_ATTR = "bq.csv.allow.jagged.rows";
+ public static final String CSV_ALLOW_JAGGED_ROWS_DESC = "Set whether BigQuery should accept rows that are missing "
+ + "trailing optional columns. If true, BigQuery treats missing trailing columns as null values. If false, "
+ + "records with missing trailing columns are treated as bad records, and if there are too many bad records, "
+ + "an invalid error is returned in the job result. By default, rows with missing trailing columns are "
+ + "considered bad records.";
- public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
- public static final String JOB_ERROR_LOCATION_DESC = "Load job error location";
+ public static final String CSV_ALLOW_QUOTED_NEW_LINES_ATTR = "bq.csv.allow.quoted.new.lines";
+ public static final String CSV_ALLOW_QUOTED_NEW_LINES_DESC = "Sets whether BigQuery should allow quoted data sections "
+ + "that contain newline characters in a CSV file. By default quoted newline are not allowed.";
- public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout";
- public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out";
+ public static final String CSV_CHARSET_ATTR = "bq.csv.charset";
+ public static final String CSV_CHARSET_DESC = "Sets the character encoding of the data.";
+ public static final String CSV_FIELD_DELIMITER_ATTR = "bq.csv.delimiter";
+ public static final String CSV_FIELD_DELIMITER_DESC = "Sets the separator for fields in a CSV file. BigQuery converts "
+ + "the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the data in its "
+ + "raw, binary state. BigQuery also supports the escape sequence \"\t\" to specify a tab separator. The default "
+ + "value is a comma (',').";
- // Batch Attributes
- public static final String SOURCE_TYPE_ATTR = "bq.load.type";
- public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded";
+ public static final String CSV_QUOTE_ATTR = "bq.csv.quote";
+ public static final String CSV_QUOTE_DESC = "Sets the value that is used to quote data sections in a CSV file. BigQuery "
+ + "converts the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the "
+ + "data in its raw, binary state. The default value is a double-quote ('\"'). If your data does not contain quoted "
+ + "sections, set the property value to an empty string. If your data contains quoted newline characters, you must "
+ + "also set the Allow Quoted New Lines property to true.";
- public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown";
- public static final String IGNORE_UNKNOWN_DESC = "Ignore fields not in table schema";
+ public static final String CSV_SKIP_LEADING_ROWS_ATTR = "bq.csv.skip.leading.rows";
+ public static final String CSV_SKIP_LEADING_ROWS_DESC = "Sets the number of rows at the top of a CSV file that BigQuery "
+ + "will skip when reading the data. The default value is 0. This property is useful if you have header rows in the "
+ + "file that should be skipped.";
- public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition";
- public static final String WRITE_DISPOSITION_DESC = "Options for writing to table";
- public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords";
- public static final String MAX_BADRECORDS_DESC = "Number of erroneous records to ignore before generating an error";
+ // Batch Attributes
public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time";
public static final String JOB_CREATE_TIME_DESC = "Time load job creation";
@@ -77,4 +111,31 @@ public class BigQueryAttributes {
public static final String JOB_LINK_ATTR = "bq.job.link";
public static final String JOB_LINK_DESC = "API Link to load job";
+
+ public static final String JOB_NB_RECORDS_ATTR = "bq.records.count";
+ public static final String JOB_NB_RECORDS_DESC = "Number of records successfully inserted";
+
+ public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
+ public static final String JOB_ERROR_MSG_DESC = "Load job error message";
+
+ public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
+ public static final String JOB_ERROR_REASON_DESC = "Load job error reason";
+
+ public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
+ public static final String JOB_ERROR_LOCATION_DESC = "Load job error location";
+
+
+ // Allowable values
+ public static final AllowableValue CREATE_IF_NEEDED = new AllowableValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(),
+ JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), "Configures the job to create the table if it does not exist.");
+ public static final AllowableValue CREATE_NEVER = new AllowableValue(JobInfo.CreateDisposition.CREATE_NEVER.name(),
+ JobInfo.CreateDisposition.CREATE_NEVER.name(), "Configures the job to fail with a not-found error if the table does not exist.");
+
+ public static final AllowableValue WRITE_EMPTY = new AllowableValue(JobInfo.WriteDisposition.WRITE_EMPTY.name(),
+ JobInfo.WriteDisposition.WRITE_EMPTY.name(), "Configures the job to fail with a duplicate error if the table already exists.");
+ public static final AllowableValue WRITE_APPEND = new AllowableValue(JobInfo.WriteDisposition.WRITE_APPEND.name(),
+ JobInfo.WriteDisposition.WRITE_APPEND.name(), "Configures the job to append data to the table if it already exists.");
+ public static final AllowableValue WRITE_TRUNCATE = new AllowableValue(JobInfo.WriteDisposition.WRITE_TRUNCATE.name(),
+ JobInfo.WriteDisposition.WRITE_TRUNCATE.name(), "Configures the job to overwrite the table data if table already exists.");
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java
new file mode 100644
index 0000000..3139ffd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.LegacySQLTypeName;
+import com.google.cloud.bigquery.Schema;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+
+/**
+ * Util class for schema manipulation
+ */
+public class BigQueryUtils {
+
+ private final static Type gsonSchemaType = new TypeToken<List<Map>>() { }.getType();
+
+ public static Field mapToField(Map fMap) {
+ String typeStr = fMap.get("type").toString();
+ String nameStr = fMap.get("name").toString();
+ String modeStr = fMap.get("mode").toString();
+ LegacySQLTypeName type = null;
+
+ if (typeStr.equals("BOOLEAN")) {
+ type = LegacySQLTypeName.BOOLEAN;
+ } else if (typeStr.equals("STRING")) {
+ type = LegacySQLTypeName.STRING;
+ } else if (typeStr.equals("BYTES")) {
+ type = LegacySQLTypeName.BYTES;
+ } else if (typeStr.equals("INTEGER")) {
+ type = LegacySQLTypeName.INTEGER;
+ } else if (typeStr.equals("FLOAT")) {
+ type = LegacySQLTypeName.FLOAT;
+ } else if (typeStr.equals("TIMESTAMP") || typeStr.equals("DATE")
+ || typeStr.equals("TIME") || typeStr.equals("DATETIME")) {
+ type = LegacySQLTypeName.TIMESTAMP;
+ } else if (typeStr.equals("RECORD")) {
+ type = LegacySQLTypeName.RECORD;
+ }
+
+ return Field.newBuilder(nameStr, type).setMode(Field.Mode.valueOf(modeStr)).build();
+ }
+
+ public static List<Field> listToFields(List<Map> m_fields) {
+ List<Field> fields = new ArrayList(m_fields.size());
+ for (Map m : m_fields) {
+ fields.add(mapToField(m));
+ }
+
+ return fields;
+ }
+
+ public static Schema schemaFromString(String schemaStr) {
+ if (schemaStr == null) {
+ return null;
+ } else {
+ Gson gson = new Gson();
+ List<Map> fields = gson.fromJson(schemaStr, gsonSchemaType);
+ return Schema.of(BigQueryUtils.listToFields(fields));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
deleted file mode 100644
index f7f5d66..0000000
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.gcp.bigquery;
-
-import com.google.cloud.bigquery.Field;
-import com.google.cloud.bigquery.LegacySQLTypeName;
-import com.google.cloud.bigquery.Schema;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- *
- */
-public class BqUtils {
- private final static Type gsonSchemaType = new TypeToken<List<Map>>() {
- }.getType();
-
- public static Field mapToField(Map fMap) {
- String typeStr = fMap.get("type").toString();
- String nameStr = fMap.get("name").toString();
- String modeStr = fMap.get("mode").toString();
- LegacySQLTypeName type = null;
-
- if (typeStr.equals("BOOLEAN")) {
- type = LegacySQLTypeName.BOOLEAN;
- } else if (typeStr.equals("STRING")) {
- type = LegacySQLTypeName.STRING;
- } else if (typeStr.equals("BYTES")) {
- type = LegacySQLTypeName.BYTES;
- } else if (typeStr.equals("INTEGER")) {
- type = LegacySQLTypeName.INTEGER;
- } else if (typeStr.equals("FLOAT")) {
- type = LegacySQLTypeName.FLOAT;
- } else if (typeStr.equals("TIMESTAMP") || typeStr.equals("DATE")
- || typeStr.equals("TIME") || typeStr.equals("DATETIME")) {
- type = LegacySQLTypeName.TIMESTAMP;
- } else if (typeStr.equals("RECORD")) {
- type = LegacySQLTypeName.RECORD;
- }
-
- return Field.newBuilder(nameStr, type).setMode(Field.Mode.valueOf(modeStr)).build();
- }
-
- public static List<Field> listToFields(List<Map> m_fields) {
- List<Field> fields = new ArrayList(m_fields.size());
- for (Map m : m_fields) {
- fields.add(mapToField(m));
- }
-
- return fields;
- }
-
- public static Schema schemaFromString(String schemaStr) {
- if (schemaStr == null) {
- return null;
- } else {
- Gson gson = new Gson();
- List<Map> fields = gson.fromJson(schemaStr, gsonSchemaType);
- return Schema.of(BqUtils.listToFields(fields));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
index 99c7f2a..5068ab5 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
@@ -18,15 +18,16 @@
package org.apache.nifi.processors.gcp.bigquery;
import com.google.cloud.RetryOption;
-import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.common.collect.ImmutableList;
+
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -35,7 +36,10 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.ProcessContext;
@@ -51,6 +55,7 @@ import org.threeten.bp.temporal.ChronoUnit;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -59,40 +64,58 @@ import java.util.concurrent.TimeUnit;
/**
* A processor for batch loading data into a Google BigQuery table
*/
-
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"google", "google cloud", "bq", "bigquery"})
-@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
+@CapabilityDescription("Batch loads flow files content to a Google BigQuery table.")
@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
-
@WritesAttributes({
- @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
+ @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
})
-
public class PutBigQueryBatch extends AbstractBigQueryProcessor {
+ private static final List<String> TYPES = Arrays.asList(FormatOptions.json().getType(), FormatOptions.csv().getType(), FormatOptions.avro().getType());
+
+ private static final Validator FORMAT_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ final ValidationResult.Builder builder = new ValidationResult.Builder();
+ builder.subject(subject).input(input);
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+ return builder.valid(true).explanation("Contains Expression Language").build();
+ }
+
+ if(TYPES.contains(input.toUpperCase())) {
+ builder.valid(true);
+ } else {
+ builder.valid(false).explanation("Load File Type must be one of the following options: " + StringUtils.join(TYPES, ", "));
+ }
+
+ return builder.build();
+ }
+ };
+
public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
.Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
.displayName("Load file type")
.description(BigQueryAttributes.SOURCE_TYPE_DESC)
.required(true)
- .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
- .defaultValue(FormatOptions.avro().getType())
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(FORMAT_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
@@ -102,7 +125,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
- .defaultValue("true")
+ .defaultValue("false")
.build();
public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
@@ -110,8 +133,8 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
.displayName("Create Disposition")
.description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
.required(true)
- .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
- .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
+ .allowableValues(BigQueryAttributes.CREATE_IF_NEEDED, BigQueryAttributes.CREATE_NEVER)
+ .defaultValue(BigQueryAttributes.CREATE_IF_NEEDED.getValue())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -120,8 +143,8 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
.displayName("Write Disposition")
.description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
.required(true)
- .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
- .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
+ .allowableValues(BigQueryAttributes.WRITE_EMPTY, BigQueryAttributes.WRITE_APPEND, BigQueryAttributes.WRITE_TRUNCATE)
+ .defaultValue(BigQueryAttributes.WRITE_EMPTY.getValue())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -134,34 +157,78 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
- private Schema schemaCache = null;
+ public static final PropertyDescriptor CSV_ALLOW_JAGGED_ROWS = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_ATTR)
+ .displayName("CSV Input - Allow Jagged Rows")
+ .description(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_DESC)
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+ public static final PropertyDescriptor CSV_ALLOW_QUOTED_NEW_LINES = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_ATTR)
+ .displayName("CSV Input - Allow Quoted New Lines")
+ .description(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_DESC)
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
- public PutBigQueryBatch() {
+ public static final PropertyDescriptor CSV_CHARSET = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.CSV_CHARSET_ATTR)
+ .displayName("CSV Input - Character Set")
+ .description(BigQueryAttributes.CSV_CHARSET_DESC)
+ .required(true)
+ .allowableValues("UTF-8", "ISO-8859-1")
+ .defaultValue("UTF-8")
+ .build();
- }
+ public static final PropertyDescriptor CSV_FIELD_DELIMITER = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.CSV_FIELD_DELIMITER_ATTR)
+ .displayName("CSV Input - Field Delimiter")
+ .description(BigQueryAttributes.CSV_FIELD_DELIMITER_DESC)
+ .required(true)
+ .defaultValue(",")
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor CSV_QUOTE = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.CSV_QUOTE_ATTR)
+ .displayName("CSV Input - Quote")
+ .description(BigQueryAttributes.CSV_QUOTE_DESC)
+ .required(true)
+ .defaultValue("\"")
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor CSV_SKIP_LEADING_ROWS = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_ATTR)
+ .displayName("CSV Input - Skip Leading Rows")
+ .description(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_DESC)
+ .required(true)
+ .defaultValue("0")
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ImmutableList.<PropertyDescriptor>builder()
.addAll(super.getSupportedPropertyDescriptors())
- .add(DATASET)
- .add(TABLE_NAME)
- .add(TABLE_SCHEMA)
.add(SOURCE_TYPE)
.add(CREATE_DISPOSITION)
.add(WRITE_DISPOSITION)
.add(MAXBAD_RECORDS)
.add(IGNORE_UNKNOWN)
- .build();
- }
-
- @Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
- .name(propertyDescriptorName)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .dynamic(true)
+ .add(CSV_ALLOW_JAGGED_ROWS)
+ .add(CSV_ALLOW_QUOTED_NEW_LINES)
+ .add(CSV_CHARSET)
+ .add(CSV_FIELD_DELIMITER)
+ .add(CSV_QUOTE)
+ .add(CSV_SKIP_LEADING_ROWS)
.build();
}
@@ -178,13 +245,10 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
return;
}
- final Map<String, String> attributes = new HashMap<>();
-
- final BigQuery bq = getCloudService();
-
final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String type = context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue();
final TableId tableId;
if (StringUtils.isEmpty(projectId)) {
@@ -193,70 +257,93 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
tableId = TableId.of(projectId, dataset, tableName);
}
- final String fileType = context.getProperty(SOURCE_TYPE).getValue();
+ try {
- String schemaString = context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue();
- Schema schema = BqUtils.schemaFromString(schemaString);
+ FormatOptions formatOption;
- WriteChannelConfiguration writeChannelConfiguration =
- WriteChannelConfiguration.newBuilder(tableId)
- .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
- .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
- .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
- .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
- .setSchema(schema)
- .setFormatOptions(FormatOptions.of(fileType))
+ if(type.equals(FormatOptions.csv().getType())) {
+ formatOption = FormatOptions.csv().toBuilder()
+ .setAllowJaggedRows(context.getProperty(CSV_ALLOW_JAGGED_ROWS).asBoolean())
+ .setAllowQuotedNewLines(context.getProperty(CSV_ALLOW_QUOTED_NEW_LINES).asBoolean())
+ .setEncoding(context.getProperty(CSV_CHARSET).getValue())
+ .setFieldDelimiter(context.getProperty(CSV_FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue())
+ .setQuote(context.getProperty(CSV_QUOTE).evaluateAttributeExpressions(flowFile).getValue())
+ .setSkipLeadingRows(context.getProperty(CSV_SKIP_LEADING_ROWS).evaluateAttributeExpressions(flowFile).asInteger())
.build();
+ } else {
+ formatOption = FormatOptions.of(type);
+ }
- TableDataWriteChannel writer = bq.writer(writeChannelConfiguration);
-
- try {
- session.read(flowFile, rawIn -> {
- ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
- while (readableByteChannel.read(byteBuffer) >= 0) {
- byteBuffer.flip();
- writer.write(byteBuffer);
- byteBuffer.clear();
- }
- });
-
- writer.close();
-
- Job job = writer.getJob();
- PropertyValue property = context.getProperty(READ_TIMEOUT);
- Long timePeriod = property.evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
- Duration duration = Duration.of(timePeriod, ChronoUnit.SECONDS);
- job = job.waitFor(RetryOption.totalTimeout(duration));
-
- if (job != null) {
- attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime()));
- attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime()));
- attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime()));
- attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink());
-
- boolean jobError = (job.getStatus().getError() != null);
-
- if (jobError) {
- attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage());
- attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason());
- attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation());
- } else {
- // in case it got looped back from error
- flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR);
- flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR);
- flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
- }
-
- if (!attributes.isEmpty()) {
- flowFile = session.putAllAttributes(flowFile, attributes);
- }
-
- if (jobError) {
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- } else {
- session.transfer(flowFile, REL_SUCCESS);
+ final Schema schema = BigQueryUtils.schemaFromString(context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions(flowFile).getValue());
+ final WriteChannelConfiguration writeChannelConfiguration =
+ WriteChannelConfiguration.newBuilder(tableId)
+ .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
+ .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
+ .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
+ .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
+ .setSchema(schema)
+ .setFormatOptions(formatOption)
+ .build();
+
+ try ( TableDataWriteChannel writer = getCloudService().writer(writeChannelConfiguration) ) {
+
+ session.read(flowFile, rawIn -> {
+ ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
+ while (readableByteChannel.read(byteBuffer) >= 0) {
+ byteBuffer.flip();
+ writer.write(byteBuffer);
+ byteBuffer.clear();
+ }
+ });
+
+ // writer must be closed to get the job
+ writer.close();
+
+ Job job = writer.getJob();
+ Long timePeriod = context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.SECONDS);
+ Duration waitFor = Duration.of(timePeriod, ChronoUnit.SECONDS);
+ job = job.waitFor(RetryOption.totalTimeout(waitFor));
+
+ if (job != null) {
+ final Map<String, String> attributes = new HashMap<>();
+
+ attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime()));
+ attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime()));
+ attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime()));
+ attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink());
+
+ boolean jobError = (job.getStatus().getError() != null);
+
+ if (jobError) {
+ attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage());
+ attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason());
+ attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation());
+ } else {
+ // in case it got looped back from error
+ flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR);
+ flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR);
+ flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
+
+ // add the number of records successfully added
+ if(job.getStatistics() instanceof LoadStatistics) {
+ final LoadStatistics stats = (LoadStatistics) job.getStatistics();
+ attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(stats.getOutputRows()));
+ }
+ }
+
+ if (!attributes.isEmpty()) {
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ }
+
+ if (jobError) {
+ getLogger().log(LogLevel.WARN, job.getStatus().getError().getMessage());
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ } else {
+ session.getProvenanceReporter().send(flowFile, job.getSelfLink(), job.getStatistics().getEndTime() - job.getStatistics().getStartTime());
+ session.transfer(flowFile, REL_SUCCESS);
+ }
}
}
@@ -266,4 +353,5 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
session.transfer(flowFile, REL_FAILURE);
}
}
-}
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
index 31dc0a7..8930a27 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
@@ -18,13 +18,17 @@ package org.apache.nifi.processors.gcp.pubsub;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.ServiceOptions;
+
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -62,4 +66,21 @@ public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor {
protected ServiceOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
return null;
}
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ final Collection<ValidationResult> results = super.customValidate(validationContext);
+
+ final boolean projectId = validationContext.getProperty(PROJECT_ID).isSet();
+ if (!projectId) {
+ results.add(new ValidationResult.Builder()
+ .subject(PROJECT_ID.getName())
+ .valid(false)
+ .explanation("The Project ID must be set for this processor.")
+ .build());
+ }
+
+ return results;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
index 7c63631..fba984e 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
@@ -16,8 +16,6 @@
*/
package org.apache.nifi.processors.gcp.storage;
-import java.net.Proxy;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -36,7 +34,6 @@ import org.apache.nifi.proxy.ProxyConfiguration;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.collect.ImmutableList;
@@ -73,7 +70,7 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
@Override
protected final Collection<ValidationResult> customValidate(ValidationContext validationContext) {
- final Collection<ValidationResult> results = new ArrayList<>();
+ final Collection<ValidationResult> results = super.customValidate(validationContext);
ProxyConfiguration.validateProxySpec(validationContext, results, ProxyAwareTransportFactory.PROXY_SPECS);
customValidate(validationContext, results);
return results;
@@ -96,30 +93,10 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
.setMaxAttempts(retryCount)
.build());
- final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context, () -> {
- final String proxyHost = context.getProperty(PROXY_HOST).getValue();
- final Integer proxyPort = context.getProperty(PROXY_PORT).asInteger();
- if (proxyHost != null && proxyPort != null && proxyPort > 0) {
- final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
- final String proxyUser = context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue();
- final String proxyPassword = context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
- componentProxyConfig.setProxyType(Proxy.Type.HTTP);
- componentProxyConfig.setProxyServerHost(proxyHost);
- componentProxyConfig.setProxyServerPort(proxyPort);
- componentProxyConfig.setProxyUserName(proxyUser);
- componentProxyConfig.setProxyUserPassword(proxyPassword);
- return componentProxyConfig;
- }
- return ProxyConfiguration.DIRECT_CONFIGURATION;
- });
-
if (!projectId.isEmpty()) {
storageOptionsBuilder.setProjectId(projectId);
}
- final ProxyAwareTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
- storageOptionsBuilder.setTransportOptions(HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build());
-
- return storageOptionsBuilder.build();
+ return storageOptionsBuilder.setTransportOptions(getTransportOptions(context)).build();
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
index 658350e..98aa1ee 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
@@ -33,7 +33,7 @@
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
- <version>0.9.0</version>
+ <version>0.12.0</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
index bb459a8..c1300a4 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
@@ -27,14 +27,15 @@
<packaging>pom</packaging>
<properties>
- <google.cloud.sdk.version>0.47.0-alpha</google.cloud.sdk.version>
+ <google.cloud.sdk.version>0.71.0-alpha</google.cloud.sdk.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
+ <!-- https://github.com/GoogleCloudPlatform/google-cloud-java/tree/master/google-cloud-bom -->
<groupId>com.google.cloud</groupId>
- <artifactId>google-cloud</artifactId>
+ <artifactId>google-cloud-bom</artifactId>
<version>${google.cloud.sdk.version}</version>
<type>pom</type>
<scope>import</scope>