You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2023/06/11 13:49:52 UTC
[nifi] branch main updated: NIFI-11657 Removed Deprecated PutBigQueryBatch and PutBigQueryStreaming
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 7485687d4b NIFI-11657 Removed Deprecated PutBigQueryBatch and PutBigQueryStreaming
7485687d4b is described below
commit 7485687d4bd71bf8637cffce7e86c4bc90857cf7
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Tue Jun 6 14:44:27 2023 -0500
NIFI-11657 Removed Deprecated PutBigQueryBatch and PutBigQueryStreaming
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #7351.
---
.../gcp/bigquery/BigQueryAttributes.java | 98 ------
.../processors/gcp/bigquery/BigQueryUtils.java | 83 -----
.../nifi/processors/gcp/bigquery/PutBigQuery.java | 2 -
.../processors/gcp/bigquery/PutBigQueryBatch.java | 364 ---------------------
.../gcp/bigquery/PutBigQueryStreaming.java | 217 ------------
.../services/org.apache.nifi.processor.Processor | 2 -
.../gcp/bigquery/PutBigQueryBatchTest.java | 158 ---------
7 files changed, 924 deletions(-)
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
index babfb54ee6..f26f6cefe5 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
@@ -17,12 +17,6 @@
package org.apache.nifi.processors.gcp.bigquery;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
-
-import com.google.cloud.bigquery.JobInfo;
-
/**
* Attributes associated with the BigQuery processors
*/
@@ -30,79 +24,19 @@ public class BigQueryAttributes {
private BigQueryAttributes() {
}
- public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
-
// Properties
- public static final String SOURCE_TYPE_ATTR = "bq.load.type";
- public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded. Possible values: AVRO, "
- + "NEWLINE_DELIMITED_JSON, CSV.";
-
public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown";
public static final String IGNORE_UNKNOWN_DESC = "Sets whether BigQuery should allow extra values that are not represented "
+ "in the table schema. If true, the extra values are ignored. If false, records with extra columns are treated as "
+ "bad records, and if there are too many bad records, an invalid error is returned in the job result. By default "
+ "unknown values are not allowed.";
- public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition";
- public static final String WRITE_DISPOSITION_DESC = "Sets the action that should occur if the destination table already exists.";
-
- public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords";
- public static final String MAX_BADRECORDS_DESC = "Sets the maximum number of bad records that BigQuery can ignore when running "
- + "the job. If the number of bad records exceeds this value, an invalid error is returned in the job result. By default "
- + "no bad record is ignored.";
-
public static final String DATASET_ATTR = "bq.dataset";
public static final String DATASET_DESC = "BigQuery dataset name (Note - The dataset must exist in GCP)";
public static final String TABLE_NAME_ATTR = "bq.table.name";
public static final String TABLE_NAME_DESC = "BigQuery table name";
- public static final String TABLE_SCHEMA_ATTR = "bq.table.schema";
- public static final String TABLE_SCHEMA_DESC = "BigQuery schema in JSON format";
-
- public static final String CREATE_DISPOSITION_ATTR = "bq.load.create_disposition";
- public static final String CREATE_DISPOSITION_DESC = "Sets whether the job is allowed to create new tables";
-
- public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout";
- public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out";
-
- public static final String CSV_ALLOW_JAGGED_ROWS_ATTR = "bq.csv.allow.jagged.rows";
- public static final String CSV_ALLOW_JAGGED_ROWS_DESC = "Set whether BigQuery should accept rows that are missing "
- + "trailing optional columns. If true, BigQuery treats missing trailing columns as null values. If false, "
- + "records with missing trailing columns are treated as bad records, and if there are too many bad records, "
- + "an invalid error is returned in the job result. By default, rows with missing trailing columns are "
- + "considered bad records.";
-
- public static final String CSV_ALLOW_QUOTED_NEW_LINES_ATTR = "bq.csv.allow.quoted.new.lines";
- public static final String CSV_ALLOW_QUOTED_NEW_LINES_DESC = "Sets whether BigQuery should allow quoted data sections "
- + "that contain newline characters in a CSV file. By default quoted newline are not allowed.";
-
- public static final String CSV_CHARSET_ATTR = "bq.csv.charset";
- public static final String CSV_CHARSET_DESC = "Sets the character encoding of the data.";
-
- public static final String CSV_FIELD_DELIMITER_ATTR = "bq.csv.delimiter";
- public static final String CSV_FIELD_DELIMITER_DESC = "Sets the separator for fields in a CSV file. BigQuery converts "
- + "the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the data in its "
- + "raw, binary state. BigQuery also supports the escape sequence \"\t\" to specify a tab separator. The default "
- + "value is a comma (',').";
-
- public static final String CSV_QUOTE_ATTR = "bq.csv.quote";
- public static final String CSV_QUOTE_DESC = "Sets the value that is used to quote data sections in a CSV file. BigQuery "
- + "converts the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the "
- + "data in its raw, binary state. The default value is a double-quote ('\"'). If your data does not contain quoted "
- + "sections, set the property value to an empty string. If your data contains quoted newline characters, you must "
- + "also set the Allow Quoted New Lines property to true.";
-
- public static final String CSV_SKIP_LEADING_ROWS_ATTR = "bq.csv.skip.leading.rows";
- public static final String CSV_SKIP_LEADING_ROWS_DESC = "Sets the number of rows at the top of a CSV file that BigQuery "
- + "will skip when reading the data. The default value is 0. This property is useful if you have header rows in the "
- + "file that should be skipped.";
-
- public static final String AVRO_USE_LOGICAL_TYPES_ATTR = "bq.avro.use.logical.types";
- public static final String AVRO_USE_LOGICAL_TYPES_DESC = "If format is set to Avro and if this option is set to true, you "
- + "can interpret logical types into their corresponding types (such as TIMESTAMP) instead of only using their raw "
- + "types (such as INTEGER).";
-
public static final String RECORD_READER_ATTR = "bq.record.reader";
public static final String RECORD_READER_DESC = "Specifies the Controller Service to use for parsing incoming data.";
@@ -111,44 +45,12 @@ public class BigQueryAttributes {
+ "rows exist. If not set the entire insert request will fail if it contains an invalid row.";
// Batch Attributes
- public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time";
- public static final String JOB_CREATE_TIME_DESC = "Time load job creation";
-
- public static final String JOB_END_TIME_ATTR = "bq.job.stat.end_time";
- public static final String JOB_END_TIME_DESC = "Time load job ended";
-
- public static final String JOB_START_TIME_ATTR = "bq.job.stat.start_time";
- public static final String JOB_START_TIME_DESC = "Time load job started";
-
- public static final String JOB_LINK_ATTR = "bq.job.link";
- public static final String JOB_LINK_DESC = "API Link to load job";
-
- public static final String JOB_ID_ATTR = "bq.job.id";
- public static final String JOB_ID_DESC = "ID of the BigQuery job";
-
public static final String JOB_NB_RECORDS_ATTR = "bq.records.count";
public static final String JOB_NB_RECORDS_DESC = "Number of records successfully inserted";
public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
- public static final String JOB_ERROR_MSG_DESC = "Load job error message";
public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
- public static final String JOB_ERROR_REASON_DESC = "Load job error reason";
public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
- public static final String JOB_ERROR_LOCATION_DESC = "Load job error location";
-
- // Allowable values
- public static final AllowableValue CREATE_IF_NEEDED = new AllowableValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(),
- JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), "Configures the job to create the table if it does not exist.");
- public static final AllowableValue CREATE_NEVER = new AllowableValue(JobInfo.CreateDisposition.CREATE_NEVER.name(),
- JobInfo.CreateDisposition.CREATE_NEVER.name(), "Configures the job to fail with a not-found error if the table does not exist.");
-
- public static final AllowableValue WRITE_EMPTY = new AllowableValue(JobInfo.WriteDisposition.WRITE_EMPTY.name(),
- JobInfo.WriteDisposition.WRITE_EMPTY.name(), "Configures the job to fail with a duplicate error if the table already exists.");
- public static final AllowableValue WRITE_APPEND = new AllowableValue(JobInfo.WriteDisposition.WRITE_APPEND.name(),
- JobInfo.WriteDisposition.WRITE_APPEND.name(), "Configures the job to append data to the table if it already exists.");
- public static final AllowableValue WRITE_TRUNCATE = new AllowableValue(JobInfo.WriteDisposition.WRITE_TRUNCATE.name(),
- JobInfo.WriteDisposition.WRITE_TRUNCATE.name(), "Configures the job to overwrite the table data if table already exists.");
-
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java
deleted file mode 100644
index 1b621e60db..0000000000
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.gcp.bigquery;
-
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.google.cloud.bigquery.Field;
-import com.google.cloud.bigquery.LegacySQLTypeName;
-import com.google.cloud.bigquery.Schema;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-/**
- * Util class for schema manipulation
- */
-public class BigQueryUtils {
-
- private final static Type gsonSchemaType = new TypeToken<List<Map>>() { }.getType();
-
- public static Field mapToField(Map fMap) {
- String typeStr = fMap.get("type").toString();
- String nameStr = fMap.get("name").toString();
- String modeStr = fMap.get("mode").toString();
- LegacySQLTypeName type = null;
-
- if (typeStr.equals("BOOLEAN")) {
- type = LegacySQLTypeName.BOOLEAN;
- } else if (typeStr.equals("STRING")) {
- type = LegacySQLTypeName.STRING;
- } else if (typeStr.equals("BYTES")) {
- type = LegacySQLTypeName.BYTES;
- } else if (typeStr.equals("INTEGER")) {
- type = LegacySQLTypeName.INTEGER;
- } else if (typeStr.equals("FLOAT")) {
- type = LegacySQLTypeName.FLOAT;
- } else if (typeStr.equals("TIMESTAMP") || typeStr.equals("DATE")
- || typeStr.equals("TIME") || typeStr.equals("DATETIME")) {
- type = LegacySQLTypeName.TIMESTAMP;
- } else if (typeStr.equals("RECORD")) {
- type = LegacySQLTypeName.RECORD;
- }
-
- return Field.newBuilder(nameStr, type).setMode(Field.Mode.valueOf(modeStr)).build();
- }
-
- public static List<Field> listToFields(List<Map> m_fields) {
- List<Field> fields = new ArrayList(m_fields.size());
- for (Map m : m_fields) {
- fields.add(mapToField(m));
- }
-
- return fields;
- }
-
- public static Schema schemaFromString(String schemaStr) {
- if (schemaStr == null) {
- return null;
- } else {
- Gson gson = new Gson();
- List<Map> fields = gson.fromJson(schemaStr, gsonSchemaType);
- return Schema.of(BigQueryUtils.listToFields(fields));
- }
- }
-
-}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
index 54105d9592..c013dd81f0 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
@@ -52,7 +52,6 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@@ -93,7 +92,6 @@ import java.util.concurrent.atomic.AtomicReference;
"The processor is record based so the used schema is driven by the RecordReader. Attributes that are not matched to the target schema" +
"are skipped. Exactly once delivery semantics are achieved via stream offsets. The Storage Write API is more efficient than the older " +
"insertAll method because it uses gRPC streaming rather than REST over HTTP")
-@SeeAlso({PutBigQueryBatch.class, PutBigQueryStreaming.class})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttributes({
@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
deleted file mode 100644
index 6b229fe914..0000000000
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.gcp.bigquery;
-
-import com.google.cloud.RetryOption;
-import com.google.cloud.bigquery.FormatOptions;
-import com.google.cloud.bigquery.Job;
-import com.google.cloud.bigquery.JobInfo;
-import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
-import com.google.cloud.bigquery.Schema;
-import com.google.cloud.bigquery.TableDataWriteChannel;
-import com.google.cloud.bigquery.TableId;
-import com.google.cloud.bigquery.WriteChannelConfiguration;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.DeprecationNotice;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.LogLevel;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
-import org.apache.nifi.processors.gcp.storage.PutGCSObject;
-import org.apache.nifi.util.StringUtils;
-import org.threeten.bp.Duration;
-import org.threeten.bp.temporal.ChronoUnit;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A processor for batch loading data into a Google BigQuery table
- * @deprecated use {@link PutBigQuery} instead which uses the Write API
- */
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@DeprecationNotice(alternatives = {PutBigQuery.class}, reason = "This processor is deprecated and may be removed in future releases.")
-@Tags({ "google", "google cloud", "bq", "bigquery" })
-@CapabilityDescription("Please be aware this processor is deprecated and may be removed in the near future. Use PutBigQuery instead. Batch loads flow files content to a Google BigQuery table.")
-@SeeAlso({ PutGCSObject.class, DeleteGCSObject.class })
-@WritesAttributes({
- @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_ID_ATTR, description = BigQueryAttributes.JOB_ID_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
-})
-@Deprecated
-public class PutBigQueryBatch extends AbstractBigQueryProcessor {
-
- private static final List<String> TYPES = Arrays.asList(FormatOptions.json().getType(), FormatOptions.csv().getType(), FormatOptions.avro().getType());
-
- private static final Validator FORMAT_VALIDATOR = new Validator() {
- @Override
- public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
- final ValidationResult.Builder builder = new ValidationResult.Builder();
- builder.subject(subject).input(input);
- if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
- return builder.valid(true).explanation("Contains Expression Language").build();
- }
-
- if (TYPES.contains(input.toUpperCase())) {
- builder.valid(true);
- } else {
- builder.valid(false).explanation("Load File Type must be one of the following options: " + StringUtils.join(TYPES, ", "));
- }
-
- return builder.build();
- }
- };
-
- public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
- .displayName("Read Timeout")
- .description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
- .required(true)
- .defaultValue("5 minutes")
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
- .displayName("Table Schema")
- .description(BigQueryAttributes.TABLE_SCHEMA_DESC)
- .required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
- .displayName("Load file type")
- .description(BigQueryAttributes.SOURCE_TYPE_DESC)
- .required(true)
- .addValidator(FORMAT_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
- .displayName("Create Disposition")
- .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
- .required(true)
- .allowableValues(BigQueryAttributes.CREATE_IF_NEEDED, BigQueryAttributes.CREATE_NEVER)
- .defaultValue(BigQueryAttributes.CREATE_IF_NEEDED.getValue())
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
- .displayName("Write Disposition")
- .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
- .required(true)
- .allowableValues(BigQueryAttributes.WRITE_EMPTY, BigQueryAttributes.WRITE_APPEND, BigQueryAttributes.WRITE_TRUNCATE)
- .defaultValue(BigQueryAttributes.WRITE_EMPTY.getValue())
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
- .displayName("Max Bad Records")
- .description(BigQueryAttributes.MAX_BADRECORDS_DESC)
- .required(true)
- .defaultValue("0")
- .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor CSV_ALLOW_JAGGED_ROWS = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_ATTR)
- .displayName("CSV Input - Allow Jagged Rows")
- .description(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_DESC)
- .required(true)
- .allowableValues("true", "false")
- .defaultValue("false")
- .build();
-
- public static final PropertyDescriptor CSV_ALLOW_QUOTED_NEW_LINES = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_ATTR)
- .displayName("CSV Input - Allow Quoted New Lines")
- .description(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_DESC)
- .required(true)
- .allowableValues("true", "false")
- .defaultValue("false")
- .build();
-
- public static final PropertyDescriptor CSV_CHARSET = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.CSV_CHARSET_ATTR)
- .displayName("CSV Input - Character Set")
- .description(BigQueryAttributes.CSV_CHARSET_DESC)
- .required(true)
- .allowableValues("UTF-8", "ISO-8859-1")
- .defaultValue("UTF-8")
- .build();
-
- public static final PropertyDescriptor CSV_FIELD_DELIMITER = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.CSV_FIELD_DELIMITER_ATTR)
- .displayName("CSV Input - Field Delimiter")
- .description(BigQueryAttributes.CSV_FIELD_DELIMITER_DESC)
- .required(true)
- .defaultValue(",")
- .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor CSV_QUOTE = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.CSV_QUOTE_ATTR)
- .displayName("CSV Input - Quote")
- .description(BigQueryAttributes.CSV_QUOTE_DESC)
- .required(true)
- .defaultValue("\"")
- .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor CSV_SKIP_LEADING_ROWS = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_ATTR)
- .displayName("CSV Input - Skip Leading Rows")
- .description(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_DESC)
- .required(true)
- .defaultValue("0")
- .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor AVRO_USE_LOGICAL_TYPES = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.AVRO_USE_LOGICAL_TYPES_ATTR)
- .displayName("Avro Input - Use Logical Types")
- .description(BigQueryAttributes.AVRO_USE_LOGICAL_TYPES_DESC)
- .required(true)
- .allowableValues("true", "false")
- .defaultValue("false")
- .build();
-
- @Override
- public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
- descriptors.add(TABLE_SCHEMA);
- descriptors.add(READ_TIMEOUT);
- descriptors.add(SOURCE_TYPE);
- descriptors.add(CREATE_DISPOSITION);
- descriptors.add(WRITE_DISPOSITION);
- descriptors.add(MAXBAD_RECORDS);
- descriptors.add(CSV_ALLOW_JAGGED_ROWS);
- descriptors.add(CSV_ALLOW_QUOTED_NEW_LINES);
- descriptors.add(CSV_CHARSET);
- descriptors.add(CSV_FIELD_DELIMITER);
- descriptors.add(CSV_QUOTE);
- descriptors.add(CSV_SKIP_LEADING_ROWS);
- descriptors.add(AVRO_USE_LOGICAL_TYPES);
- return Collections.unmodifiableList(descriptors);
- }
-
- @Override
- @OnScheduled
- public void onScheduled(ProcessContext context) {
- super.onScheduled(context);
- }
-
- @Override
- public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final String type = context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue();
- final TableId tableId = getTableId(context, flowFile.getAttributes());
-
- try {
-
- FormatOptions formatOption;
-
- if (type.equals(FormatOptions.csv().getType())) {
- formatOption = FormatOptions.csv().toBuilder()
- .setAllowJaggedRows(context.getProperty(CSV_ALLOW_JAGGED_ROWS).asBoolean())
- .setAllowQuotedNewLines(context.getProperty(CSV_ALLOW_QUOTED_NEW_LINES).asBoolean())
- .setEncoding(context.getProperty(CSV_CHARSET).getValue())
- .setFieldDelimiter(context.getProperty(CSV_FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue())
- .setQuote(context.getProperty(CSV_QUOTE).evaluateAttributeExpressions(flowFile).getValue())
- .setSkipLeadingRows(context.getProperty(CSV_SKIP_LEADING_ROWS).evaluateAttributeExpressions(flowFile).asInteger())
- .build();
- } else {
- formatOption = FormatOptions.of(type);
- }
-
- final Schema schema = BigQueryUtils.schemaFromString(context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions(flowFile).getValue());
- final WriteChannelConfiguration writeChannelConfiguration = WriteChannelConfiguration.newBuilder(tableId)
- .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
- .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
- .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean())
- .setUseAvroLogicalTypes(context.getProperty(AVRO_USE_LOGICAL_TYPES).asBoolean())
- .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
- .setSchema(schema)
- .setFormatOptions(formatOption)
- .build();
-
- try (TableDataWriteChannel writer = getCloudService().writer(writeChannelConfiguration)) {
-
- session.read(flowFile, rawIn -> {
- ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
- while (readableByteChannel.read(byteBuffer) >= 0) {
- byteBuffer.flip();
- writer.write(byteBuffer);
- byteBuffer.clear();
- }
- });
-
- // writer must be closed to get the job
- writer.close();
-
- Job job = writer.getJob();
- Long timePeriod = context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.SECONDS);
- Duration waitFor = Duration.of(timePeriod, ChronoUnit.SECONDS);
- job = job.waitFor(RetryOption.totalTimeout(waitFor));
-
- if (job != null) {
- final Map<String, String> attributes = new HashMap<>();
-
- attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime()));
- attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime()));
- attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime()));
- attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink());
- attributes.put(BigQueryAttributes.JOB_ID_ATTR, job.getJobId().getJob());
-
- boolean jobError = (job.getStatus().getError() != null);
-
- if (jobError) {
- attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage());
- attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason());
- attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation());
- } else {
- // in case it got looped back from error
- flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR);
- flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR);
- flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
-
- // add the number of records successfully added
- if (job.getStatistics() instanceof LoadStatistics) {
- final LoadStatistics stats = (LoadStatistics) job.getStatistics();
- attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(stats.getOutputRows()));
- }
- }
-
- if (!attributes.isEmpty()) {
- flowFile = session.putAllAttributes(flowFile, attributes);
- }
-
- if (jobError) {
- getLogger().log(LogLevel.WARN, job.getStatus().getError().getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- } else {
- session.getProvenanceReporter().send(flowFile, job.getSelfLink(), job.getStatistics().getEndTime() - job.getStatistics().getStartTime());
- session.transfer(flowFile, REL_SUCCESS);
- }
- }
- }
-
- } catch (Exception ex) {
- getLogger().log(LogLevel.ERROR, ex.getMessage(), ex);
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- }
- }
-
-}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
deleted file mode 100644
index 7e00cc2359..0000000000
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.gcp.bigquery;
-
-import com.google.cloud.bigquery.BigQueryError;
-import com.google.cloud.bigquery.InsertAllRequest;
-import com.google.cloud.bigquery.InsertAllResponse;
-import com.google.cloud.bigquery.TableId;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.SystemResource;
-import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.DeprecationNotice;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.LogLevel;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordReaderFactory;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-
-import java.io.InputStream;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A processor for streaming loading data into a Google BigQuery table. It uses the BigQuery
- * streaming insert API to insert data. This provides the lowest-latency insert path into BigQuery,
- * and therefore is the default method when the input is unbounded. BigQuery will make a strong
- * effort to ensure no duplicates when using this path, however there are some scenarios in which
- * BigQuery is unable to make this guarantee (see
- * https://cloud.google.com/bigquery/streaming-data-into-bigquery). A query can be run over the
- * output table to periodically clean these rare duplicates. Alternatively, using the Batch insert
- * method does guarantee no duplicates, though the latency for the insert into BigQuery will be much
- * higher.
- *
- * @deprecated use {@link PutBigQuery} instead which uses the Write API
- */
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@DeprecationNotice(alternatives = {PutBigQuery.class}, reason = "This processor is deprecated and may be removed in future releases.")
-@Tags({ "google", "google cloud", "bq", "gcp", "bigquery", "record" })
-@CapabilityDescription("Please be aware this processor is deprecated and may be removed in the near future. Use PutBigQuery instead. "
- + "Load data into Google BigQuery table using the streaming API. This processor "
- + "is not intended to load large flow files as it will load the full content into memory. If "
- + "you need to insert large flow files, consider using PutBigQueryBatch instead.")
-@SeeAlso({ PutBigQueryBatch.class })
-@SystemResourceConsideration(resource = SystemResource.MEMORY)
-@WritesAttributes({
- @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
-})
-@Deprecated
-public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
-
- private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
- private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS");
-
- public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.RECORD_READER_ATTR)
- .displayName("Record Reader")
- .description(BigQueryAttributes.RECORD_READER_DESC)
- .identifiesControllerService(RecordReaderFactory.class)
- .required(true)
- .build();
-
- public static final PropertyDescriptor SKIP_INVALID_ROWS = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR)
- .displayName("Skip Invalid Rows")
- .description(BigQueryAttributes.SKIP_INVALID_ROWS_DESC)
- .required(true)
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue("false")
- .build();
-
- @Override
- public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
- descriptors.add(RECORD_READER);
- descriptors.add(SKIP_INVALID_ROWS);
- return Collections.unmodifiableList(descriptors);
- }
-
- @Override
- @OnScheduled
- public void onScheduled(ProcessContext context) {
- super.onScheduled(context);
- }
-
- @Override
- public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
- final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
- final TableId tableId = getTableId(context, flowFile.getAttributes());
-
- try {
-
- InsertAllRequest.Builder request = InsertAllRequest.newBuilder(tableId);
- int nbrecord = 0;
-
- try (final InputStream in = session.read(flowFile)) {
- final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
- try (final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
- Record currentRecord;
- while ((currentRecord = reader.nextRecord()) != null) {
- request.addRow(convertMapRecord(currentRecord.toMap()));
- nbrecord++;
- }
- }
- }
-
- request.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean());
- request.setSkipInvalidRows(context.getProperty(SKIP_INVALID_ROWS).evaluateAttributeExpressions(flowFile).asBoolean());
-
- InsertAllResponse response = getCloudService().insertAll(request.build());
-
- final Map<String, String> attributes = new HashMap<>();
-
- if (response.hasErrors()) {
- getLogger().log(LogLevel.WARN, "Failed to insert {} of {} records into BigQuery {} table.", new Object[] { response.getInsertErrors().size(), nbrecord, tableName });
- if (getLogger().isDebugEnabled()) {
- for (long index : response.getInsertErrors().keySet()) {
- for (BigQueryError e : response.getInsertErrors().get(index)) {
- getLogger().log(LogLevel.DEBUG, "Failed to insert record #{}: {}", new Object[] { index, e.getMessage() });
- }
- }
- }
-
- attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(nbrecord - response.getInsertErrors().size()));
-
- flowFile = session.penalize(flowFile);
- flowFile = session.putAllAttributes(flowFile, attributes);
- session.transfer(flowFile, REL_FAILURE);
- } else {
- attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(nbrecord));
- flowFile = session.putAllAttributes(flowFile, attributes);
- session.transfer(flowFile, REL_SUCCESS);
- }
-
- } catch (Exception ex) {
- getLogger().log(LogLevel.ERROR, ex.getMessage(), ex);
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- }
- }
-
- private Map<String, Object> convertMapRecord(Map<String, Object> map) {
- Map<String, Object> result = new HashMap<String, Object>();
- for (String key : map.keySet()) {
- Object obj = map.get(key);
- if (obj instanceof MapRecord) {
- result.put(key, convertMapRecord(((MapRecord) obj).toMap()));
- } else if (obj instanceof Object[]
- && ((Object[]) obj).length > 0
- && ((Object[]) obj)[0] instanceof MapRecord) {
- List<Map<String, Object>> lmapr = new ArrayList<Map<String, Object>>();
- for (Object mapr : ((Object[]) obj)) {
- lmapr.add(convertMapRecord(((MapRecord) mapr).toMap()));
- }
- result.put(key, lmapr);
- } else if (obj instanceof Timestamp) {
- // ZoneOffset.UTC time zone is necessary due to implicit time zone conversion in Record Readers from
- // the local system time zone to the GMT time zone
- LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Timestamp) obj).getTime()), ZoneOffset.UTC);
- result.put(key, dateTime.format(timestampFormatter));
- } else if (obj instanceof Time) {
- // ZoneOffset.UTC time zone is necessary due to implicit time zone conversion in Record Readers from
- // the local system time zone to the GMT time zone
- LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Time) obj).getTime()), ZoneOffset.UTC);
- result.put(key, dateTime.format(timeFormatter));
- } else if (obj instanceof Date) {
- result.put(key, obj.toString());
- } else {
- result.put(key, obj);
- }
- }
- return result;
- }
-
-}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 1fe8dd6535..fbcb0d907a 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -21,8 +21,6 @@ org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite
org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite
org.apache.nifi.processors.gcp.bigquery.PutBigQuery
-org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
-org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming
org.apache.nifi.processors.gcp.drive.ListGoogleDrive
org.apache.nifi.processors.gcp.drive.FetchGoogleDrive
org.apache.nifi.processors.gcp.drive.PutGoogleDrive
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
deleted file mode 100644
index 4ad8ae3eb6..0000000000
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.bigquery;
-
-import com.google.cloud.RetryOption;
-import com.google.cloud.bigquery.BigQuery;
-import com.google.cloud.bigquery.BigQueryException;
-import com.google.cloud.bigquery.FormatOptions;
-import com.google.cloud.bigquery.Job;
-import com.google.cloud.bigquery.JobId;
-import com.google.cloud.bigquery.JobInfo;
-import com.google.cloud.bigquery.JobStatistics;
-import com.google.cloud.bigquery.JobStatus;
-import com.google.cloud.bigquery.TableDataWriteChannel;
-import com.google.cloud.bigquery.WriteChannelConfiguration;
-import org.apache.nifi.components.ConfigVerificationResult;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.VerifiableProcessor;
-import org.apache.nifi.util.TestRunner;
-import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.when;
-
-/**
- * Unit tests for {@link PutBigQueryBatch}.
- */
-public class PutBigQueryBatchTest extends AbstractBQTest {
- private static final String TABLE_NAME = "test_table";
- private static final String TABLE_SCHEMA = "[{ \"mode\": \"NULLABLE\", \"name\": \"data\", \"type\": \"STRING\" }]";
- private static final String SOURCE_TYPE = FormatOptions.json().getType();
- private static final String CREATE_DISPOSITION = JobInfo.CreateDisposition.CREATE_IF_NEEDED.name();
- private static final String WRITE_DISPOSITION = JobInfo.WriteDisposition.WRITE_EMPTY.name();
- private static final String MAX_BAD_RECORDS = "0";
- private static final String IGNORE_UNKNOWN = "true";
- private static final String READ_TIMEOUT = "5 minutes";
-
- @Mock
- Job job;
-
- @Mock
- JobId jobId;
-
- @Mock
- JobStatus jobStatus;
-
- @Mock
- JobStatistics stats;
-
- @Mock
- TableDataWriteChannel tableDataWriteChannel;
-
- @Override
- public AbstractBigQueryProcessor getProcessor() {
- return new PutBigQueryBatch() {
- @Override
- protected BigQuery getCloudService() {
- return bq;
- }
-
- @Override
- protected BigQuery getCloudService(final ProcessContext context) {
- return bq;
- }
- };
- }
-
- @Override
- protected void addRequiredPropertiesToRunner(TestRunner runner) {
- runner.setProperty(PutBigQueryBatch.DATASET, DATASET);
- runner.setProperty(PutBigQueryBatch.TABLE_NAME, TABLE_NAME);
- runner.setProperty(PutBigQueryBatch.TABLE_SCHEMA, TABLE_SCHEMA);
- runner.setProperty(PutBigQueryBatch.SOURCE_TYPE, SOURCE_TYPE);
- runner.setProperty(PutBigQueryBatch.CREATE_DISPOSITION, CREATE_DISPOSITION);
- runner.setProperty(PutBigQueryBatch.WRITE_DISPOSITION, WRITE_DISPOSITION);
- runner.setProperty(PutBigQueryBatch.MAXBAD_RECORDS, MAX_BAD_RECORDS);
- runner.setProperty(PutBigQueryBatch.IGNORE_UNKNOWN, IGNORE_UNKNOWN);
- runner.setProperty(PutBigQueryBatch.READ_TIMEOUT, READ_TIMEOUT);
- }
-
- @Test
- public void testSuccessfulLoad() throws Exception {
- when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel);
- when(tableDataWriteChannel.getJob()).thenReturn(job);
- when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenReturn(job);
- when(job.getStatus()).thenReturn(jobStatus);
- when(job.getStatistics()).thenReturn(stats);
-
- when(stats.getCreationTime()).thenReturn(0L);
- when(stats.getStartTime()).thenReturn(1L);
- when(stats.getEndTime()).thenReturn(2L);
- when(job.getJobId()).thenReturn(jobId);
- when(jobId.getJob()).thenReturn("job-id");
-
- final AbstractBigQueryProcessor processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
- runner.enqueue("{ \"data\": \"datavalue\" }");
-
- runner.run();
-
- when(bq.testIamPermissions(any(), any())).thenReturn(Collections.singletonList("permission"));
- final List<ConfigVerificationResult> verificationResults = ((VerifiableProcessor) processor).verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
- assertEquals(2, verificationResults.size());
- assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, verificationResults.get(1).getOutcome());
-
- runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_SUCCESS);
- }
-
- @Test
- public void testFailedLoad() throws Exception {
- when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel);
- when(tableDataWriteChannel.getJob()).thenReturn(job);
- when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenThrow(BigQueryException.class);
-
- final TestRunner runner = buildNewRunner(getProcessor());
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
- runner.enqueue("{ \"data\": \"datavalue\" }");
-
- runner.run();
-
- runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_FAILURE);
- }
-
- @Test
- public void testMandatoryProjectId() throws Exception {
- final TestRunner runner = buildNewRunner(getProcessor());
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
- runner.removeProperty(PutBigQueryBatch.PROJECT_ID);
- runner.assertNotValid();
- }
-}
\ No newline at end of file